Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validator recording fix #2660

Merged
merged 17 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions execution/gethexec/block_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
flag "github.com/spf13/pflag"
)

// BlockRecorder uses a separate statedatabase from the blockchain.
Expand All @@ -25,6 +26,8 @@ import (
// Most recent/advanced header we ever computed (lastHdr)
// Hopefully - some recent valid block. For that we always keep one candidate block until it becomes validated.
type BlockRecorder struct {
config *BlockRecorderConfig

recordingDatabase *arbitrum.RecordingDatabase
execEngine *ExecutionEngine

Expand All @@ -39,10 +42,33 @@ type BlockRecorder struct {
preparedLock sync.Mutex
}

func NewBlockRecorder(config *arbitrum.RecordingDatabaseConfig, execEngine *ExecutionEngine, ethDb ethdb.Database) *BlockRecorder {
type BlockRecorderConfig struct {
TrieDirtyCache int `koanf:"trie-dirty-cache"`
TrieCleanCache int `koanf:"trie-clean-cache"`
MaxPrepared int `koanf:"max-prepared"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have this extend arbitrum.RecordingDatabaseConfig (having that as an unnamed field, I forget the proper terminology in geth). Then we can call arbitrum.RecordingDatabaseConfigAddOptions inside BlockRecorderConfigAddOptions. Alternatively, I'd just drop the geth struct, or at least drop its koanf options and the AddOptions function to clarify that it's not directly part of the config.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'll remove the geth koanf and AddOptions

}

var DefaultBlockRecorderConfig = BlockRecorderConfig{
TrieDirtyCache: 1024,
TrieCleanCache: 16,
MaxPrepared: 1000,
}

func BlockRecorderConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".trie-dirty-cache", DefaultBlockRecorderConfig.TrieDirtyCache, "like trie-dirty-cache for the separate, recording database (used for validation)")
f.Int(prefix+".trie-clean-cache", DefaultBlockRecorderConfig.TrieCleanCache, "like trie-clean-cache for the separate, recording database (used for validation)")
f.Int(prefix+".max-prepared", DefaultBlockRecorderConfig.MaxPrepared, "max references to store in the recording database")
}

func NewBlockRecorder(config *BlockRecorderConfig, execEngine *ExecutionEngine, ethDb ethdb.Database) *BlockRecorder {
dbConfig := arbitrum.RecordingDatabaseConfig{
TrieDirtyCache: config.TrieDirtyCache,
TrieCleanCache: config.TrieCleanCache,
}
recorder := &BlockRecorder{
config: config,
execEngine: execEngine,
recordingDatabase: arbitrum.NewRecordingDatabase(config, ethDb, execEngine.bc),
recordingDatabase: arbitrum.NewRecordingDatabase(&dbConfig, ethDb, execEngine.bc),
}
execEngine.SetRecorder(recorder)
return recorder
Expand Down Expand Up @@ -303,7 +329,7 @@ func (r *BlockRecorder) PrepareForRecord(ctx context.Context, start, end arbutil
r.updateLastHdr(header)
hdrNum++
}
r.preparedAddTrim(references, 1000)
r.preparedAddTrim(references, r.config.MaxPrepared)
return nil
}

Expand Down
30 changes: 15 additions & 15 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,19 @@ func StylusTargetConfigAddOptions(prefix string, f *flag.FlagSet) {
}

type Config struct {
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"`
RecordingDatabase arbitrum.RecordingDatabaseConfig `koanf:"recording-database"`
TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"`
Forwarder ForwarderConfig `koanf:"forwarder"`
ForwardingTarget string `koanf:"forwarding-target"`
SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"`
Caching CachingConfig `koanf:"caching"`
RPC arbitrum.Config `koanf:"rpc"`
TxLookupLimit uint64 `koanf:"tx-lookup-limit"`
EnablePrefetchBlock bool `koanf:"enable-prefetch-block"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
StylusTarget StylusTargetConfig `koanf:"stylus-target"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"`
RecordingDatabase BlockRecorderConfig `koanf:"recording-database"`
TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"`
Forwarder ForwarderConfig `koanf:"forwarder"`
ForwardingTarget string `koanf:"forwarding-target"`
SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"`
Caching CachingConfig `koanf:"caching"`
RPC arbitrum.Config `koanf:"rpc"`
TxLookupLimit uint64 `koanf:"tx-lookup-limit"`
EnablePrefetchBlock bool `koanf:"enable-prefetch-block"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
StylusTarget StylusTargetConfig `koanf:"stylus-target"`

forwardingTarget string
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
arbitrum.ConfigAddOptions(prefix+".rpc", f)
SequencerConfigAddOptions(prefix+".sequencer", f)
headerreader.AddOptions(prefix+".parent-chain-reader", f)
arbitrum.RecordingDatabaseConfigAddOptions(prefix+".recording-database", f)
BlockRecorderConfigAddOptions(prefix+".recording-database", f)
f.String(prefix+".forwarding-target", ConfigDefault.ForwardingTarget, "transaction forwarding target URL, or \"null\" to disable forwarding (iff not sequencer)")
f.StringSlice(prefix+".secondary-forwarding-target", ConfigDefault.SecondaryForwardingTarget, "secondary transaction forwarding target URL")
AddOptionsForNodeForwarderConfig(prefix+".forwarder", f)
Expand All @@ -139,7 +139,7 @@ var ConfigDefault = Config{
RPC: arbitrum.DefaultConfig,
Sequencer: DefaultSequencerConfig,
ParentChainReader: headerreader.DefaultConfig,
RecordingDatabase: arbitrum.DefaultRecordingDatabaseConfig,
RecordingDatabase: DefaultBlockRecorderConfig,
ForwardingTarget: "",
SecondaryForwardingTarget: []string{},
TxPreChecker: DefaultTxPreCheckerConfig,
Expand Down
71 changes: 54 additions & 17 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ type BlockValidator struct {
chainCaughtUp bool

// can only be accessed from creation thread or if holding reorg-write
nextCreateBatch []byte
nextCreateBatchBlockHash common.Hash
nextCreateBatchMsgCount arbutil.MessageIndex
nextCreateBatchReread bool
nextCreateStartGS validator.GoGlobalState
nextCreatePrevDelayed uint64
nextCreateBatch *FullBatchInfo
nextCreateBatchReread bool
prevBatchCache map[uint64]*FullBatchInfo

nextCreateStartGS validator.GoGlobalState
nextCreatePrevDelayed uint64

// can only be accessed from from validation thread or if holding reorg-write
lastValidGS validator.GoGlobalState
Expand Down Expand Up @@ -108,6 +108,7 @@ type BlockValidatorConfig struct {
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
RecordingIterLimit uint64 `koanf:"recording-iter-limit"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
BatchCacheLimit uint32 `koanf:"batch-cache-limit"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Expand Down Expand Up @@ -172,8 +173,9 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) {
redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f)
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (stores batch-copy per block)")
f.Uint64(prefix+".prerecorded-blocks", DefaultBlockValidatorConfig.PrerecordedBlocks, "record that many blocks ahead of validation (larger footprint)")
f.Uint32(prefix+".batch-cache-limit", DefaultBlockValidatorConfig.BatchCacheLimit, "limit number of old batches to keep in block-validator")
f.String(prefix+".current-module-root", DefaultBlockValidatorConfig.CurrentModuleRoot, "current wasm module root ('current' read from chain, 'latest' from machines/latest dir, or provide hash)")
f.Uint64(prefix+".recording-iter-limit", DefaultBlockValidatorConfig.RecordingIterLimit, "limit on block recordings sent per iteration")
f.String(prefix+".pending-upgrade-module-root", DefaultBlockValidatorConfig.PendingUpgradeModuleRoot, "pending upgrade wasm module root to additionally validate (hash, 'latest' or empty)")
Expand All @@ -192,8 +194,9 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{
ValidationServer: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
BatchCacheLimit: 20,
CurrentModuleRoot: "current",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Expand All @@ -209,6 +212,7 @@ var TestBlockValidatorConfig = BlockValidatorConfig{
RedisValidationClientConfig: redis.TestValidationClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
BatchCacheLimit: 20,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
RecordingIterLimit: 20,
CurrentModuleRoot: "latest",
Expand Down Expand Up @@ -271,6 +275,7 @@ func NewBlockValidator(
progressValidationsChan: make(chan struct{}, 1),
config: config,
fatalErr: fatalErr,
prevBatchCache: make(map[uint64]*FullBatchInfo),
}
if !config().Dangerous.ResetBlockValidation {
validated, err := ret.ReadLastValidatedInfo()
Expand Down Expand Up @@ -571,33 +576,60 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e
}
if v.nextCreateStartGS.PosInBatch == 0 || v.nextCreateBatchReread {
// new batch
found, batch, batchBlockHash, count, err := v.readBatch(ctx, v.nextCreateStartGS.Batch)
found, fullBatchInfo, err := v.readBatch(ctx, v.nextCreateStartGS.Batch)
if !found {
return false, err
}
v.nextCreateBatch = batch
v.nextCreateBatchBlockHash = batchBlockHash
v.nextCreateBatchMsgCount = count
if v.nextCreateBatch != nil {
v.prevBatchCache[v.nextCreateBatch.Number] = v.nextCreateBatch
}
v.nextCreateBatch = fullBatchInfo
// #nosec G115
validatorMsgCountCurrentBatch.Update(int64(count))
validatorMsgCountCurrentBatch.Update(int64(fullBatchInfo.MsgCount))
batchCacheLimit := v.config().BatchCacheLimit
if len(v.prevBatchCache) > int(batchCacheLimit) {
for num := range v.prevBatchCache {
if num < v.nextCreateStartGS.Batch-uint64(batchCacheLimit) {
delete(v.prevBatchCache, num)
}
}
}
v.nextCreateBatchReread = false
}
endGS := validator.GoGlobalState{
BlockHash: endRes.BlockHash,
SendRoot: endRes.SendRoot,
}
if pos+1 < v.nextCreateBatchMsgCount {
if pos+1 < v.nextCreateBatch.MsgCount {
endGS.Batch = v.nextCreateStartGS.Batch
endGS.PosInBatch = v.nextCreateStartGS.PosInBatch + 1
} else if pos+1 == v.nextCreateBatchMsgCount {
} else if pos+1 == v.nextCreateBatch.MsgCount {
endGS.Batch = v.nextCreateStartGS.Batch + 1
endGS.PosInBatch = 0
} else {
return false, fmt.Errorf("illegal batch msg count %d pos %d batch %d", v.nextCreateBatchMsgCount, pos, endGS.Batch)
return false, fmt.Errorf("illegal batch msg count %d pos %d batch %d", v.nextCreateBatch.MsgCount, pos, endGS.Batch)
}
chainConfig := v.streamer.ChainConfig()
batchReader := func(batchNum uint64) (*FullBatchInfo, error) {
if batchNum == v.nextCreateBatch.Number {
return v.nextCreateBatch, nil
}
// only batch-posting-reports will get here, and there's only one per batch
if entry, found := v.prevBatchCache[batchNum]; found {
delete(v.prevBatchCache, batchNum)
return entry, nil
}
found, entry, err := v.readBatch(ctx, batchNum)
if err != nil {
return nil, err
}
if !found {
return nil, fmt.Errorf("batch %d not found", batchNum)
}
return entry, nil
}
entry, err := newValidationEntry(
pos, v.nextCreateStartGS, endGS, msg, v.nextCreateBatch, v.nextCreateBatchBlockHash, v.nextCreatePrevDelayed, chainConfig,
pos, v.nextCreateStartGS, endGS, msg, batchReader, v.nextCreatePrevDelayed, chainConfig,
)
if err != nil {
return false, err
Expand Down Expand Up @@ -997,6 +1029,9 @@ func (v *BlockValidator) UpdateLatestStaked(count arbutil.MessageIndex, globalSt
v.nextCreateStartGS = globalState
v.nextCreatePrevDelayed = msg.DelayedMessagesRead
v.nextCreateBatchReread = true
if v.nextCreateBatch != nil {
v.prevBatchCache[v.nextCreateBatch.Number] = v.nextCreateBatch
}
v.createdA.Store(countUint64)
}
// under the reorg mutex we don't need atomic access
Expand All @@ -1023,6 +1058,7 @@ func (v *BlockValidator) ReorgToBatchCount(count uint64) {
defer v.reorgMutex.Unlock()
if v.nextCreateStartGS.Batch >= count {
v.nextCreateBatchReread = true
v.prevBatchCache = make(map[uint64]*FullBatchInfo)
}
}

Expand Down Expand Up @@ -1063,6 +1099,7 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex)
v.nextCreateStartGS = buildGlobalState(*res, endPosition)
v.nextCreatePrevDelayed = msg.DelayedMessagesRead
v.nextCreateBatchReread = true
v.prevBatchCache = make(map[uint64]*FullBatchInfo)
countUint64 := uint64(count)
v.createdA.Store(countUint64)
// under the reorg mutex we don't need atomic access
Expand Down
Loading
Loading