Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-2797] block validator: allow large distance between outgoing validations #2797

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 93 additions & 21 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ import (
)

var (
validatorPendingValidationsGauge = metrics.NewRegisteredGauge("arb/validator/validations/pending", nil)
validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil)
validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil)
validatorProfileWaitToRecordHist = metrics.NewRegisteredHistogram("arb/validator/profile/wait_to_record", nil, metrics.NewBoundedHistogramSample())
validatorProfileRecordingHist = metrics.NewRegisteredHistogram("arb/validator/profile/recording", nil, metrics.NewBoundedHistogramSample())
validatorProfileWaitToLaunchHist = metrics.NewRegisteredHistogram("arb/validator/profile/wait_to_launch", nil, metrics.NewBoundedHistogramSample())
validatorProfileLaunchingHist = metrics.NewRegisteredHistogram("arb/validator/profile/launching", nil, metrics.NewBoundedHistogramSample())
validatorProfileRunningHist = metrics.NewRegisteredHistogram("arb/validator/profile/running", nil, metrics.NewBoundedHistogramSample())
validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil)
validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil)
validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil)
validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil)
validatorPendingValidationsGauge = metrics.NewRegisteredGauge("arb/validator/validations/pending", nil)
validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil)
validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil)
validatorProfileWaitToRecordHist = metrics.NewRegisteredHistogram("arb/validator/profile/wait_to_record", nil, metrics.NewBoundedHistogramSample())
validatorProfileRecordingHist = metrics.NewRegisteredHistogram("arb/validator/profile/recording", nil, metrics.NewBoundedHistogramSample())
validatorProfileWaitToLaunchHist = metrics.NewRegisteredHistogram("arb/validator/profile/wait_to_launch", nil, metrics.NewBoundedHistogramSample())
validatorProfileLaunchingHist = metrics.NewRegisteredHistogram("arb/validator/profile/launching", nil, metrics.NewBoundedHistogramSample())
validatorProfileRunningHist = metrics.NewRegisteredHistogram("arb/validator/profile/running", nil, metrics.NewBoundedHistogramSample())
validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil)
validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil)
validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil)
validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil)
validatorMsgCountLastValidationSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_last_validation_sent", nil)
)

type BlockValidator struct {
Expand Down Expand Up @@ -77,10 +78,11 @@ type BlockValidator struct {

// can be read (atomic.Load) by anyone holding reorg-read
// written (atomic.Set) by appropriate thread or (any way) holding reorg-write
createdA atomic.Uint64
recordSentA atomic.Uint64
validatedA atomic.Uint64
validations containers.SyncMap[arbutil.MessageIndex, *validationStatus]
createdA atomic.Uint64
recordSentA atomic.Uint64
validatedA atomic.Uint64
lastValidationSentA atomic.Uint64
validations containers.SyncMap[arbutil.MessageIndex, *validationStatus]

config BlockValidatorConfigFetcher

Expand Down Expand Up @@ -114,6 +116,7 @@ type BlockValidatorConfig struct {
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
RecordingIterLimit uint64 `koanf:"recording-iter-limit"`
ValidationSentLimit uint64 `koanf:"validation-sent-limit"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
BatchCacheLimit uint32 `koanf:"batch-cache-limit"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
Expand Down Expand Up @@ -188,6 +191,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Uint32(prefix+".batch-cache-limit", DefaultBlockValidatorConfig.BatchCacheLimit, "limit number of old batches to keep in block-validator")
f.String(prefix+".current-module-root", DefaultBlockValidatorConfig.CurrentModuleRoot, "current wasm module root ('current' read from chain, 'latest' from machines/latest dir, or provide hash)")
f.Uint64(prefix+".recording-iter-limit", DefaultBlockValidatorConfig.RecordingIterLimit, "limit on block recordings sent per iteration")
f.Uint64(prefix+".validation-sent-limit", DefaultBlockValidatorConfig.ValidationSentLimit, "limit on block validations to keep in validation sent state")
f.String(prefix+".pending-upgrade-module-root", DefaultBlockValidatorConfig.PendingUpgradeModuleRoot, "pending upgrade wasm module root to additionally validate (hash, 'latest' or empty)")
f.Bool(prefix+".failure-is-fatal", DefaultBlockValidatorConfig.FailureIsFatal, "failing a validation is treated as a fatal error")
BlockValidatorDangerousConfigAddOptions(prefix+".dangerous", f)
Expand Down Expand Up @@ -215,6 +219,7 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{
BlockInputsFilePath: "./target/validation_inputs",
MemoryFreeLimit: "default",
RecordingIterLimit: 20,
ValidationSentLimit: 1024,
}

var TestBlockValidatorConfig = BlockValidatorConfig{
Expand All @@ -227,6 +232,7 @@ var TestBlockValidatorConfig = BlockValidatorConfig{
BatchCacheLimit: 20,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
RecordingIterLimit: 20,
ValidationSentLimit: 1024,
CurrentModuleRoot: "latest",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Expand Down Expand Up @@ -364,6 +370,10 @@ func (v *BlockValidator) validated() arbutil.MessageIndex {
return atomicLoadPos(&v.validatedA)
}

func (v *BlockValidator) lastValidationSent() arbutil.MessageIndex {
return atomicLoadPos(&v.lastValidationSentA)
}

func (v *BlockValidator) Validated(t *testing.T) arbutil.MessageIndex {
return v.validated()
}
Expand Down Expand Up @@ -572,7 +582,7 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e
v.reorgMutex.RLock()
defer v.reorgMutex.RUnlock()
pos := v.created()
if pos > v.validated()+arbutil.MessageIndex(v.config().ForwardBlocks) {
if pos > v.recordSent()+arbutil.MessageIndex(v.config().ForwardBlocks) {
log.Trace("create validation entry: nothing to do", "pos", pos, "validated", v.validated())
return false, nil
}
Expand Down Expand Up @@ -699,10 +709,10 @@ func (v *BlockValidator) sendNextRecordRequests(ctx context.Context) (bool, erro
v.reorgMutex.RLock()
pos := v.recordSent()
created := v.created()
validated := v.validated()
validationSent := v.lastValidationSent()
v.reorgMutex.RUnlock()

recordUntil := validated + arbutil.MessageIndex(v.config().PrerecordedBlocks) - 1
recordUntil := validationSent + arbutil.MessageIndex(v.config().PrerecordedBlocks) - 1
if recordUntil > created-1 {
recordUntil = created - 1
}
Expand Down Expand Up @@ -800,7 +810,6 @@ func (v *BlockValidator) advanceValidations(ctx context.Context) (*arbutil.Messa
v.reorgMutex.RLock()
defer v.reorgMutex.RUnlock()

wasmRoots := v.GetModuleRootsToValidate()
pos := v.validated() - 1 // to reverse the first +1 in the loop
validationsLoop:
for {
Expand Down Expand Up @@ -866,7 +875,38 @@ validationsLoop:
nonBlockingTrigger(v.testingProgressMadeChan)
}
log.Trace("result validated", "count", v.validated(), "blockHash", v.lastValidGS.BlockHash)
continue
}
}
}

// return val:
// *MessageIndex - pointer to bad entry if there is one (requires reorg)
func (v *BlockValidator) sendValidations(ctx context.Context) (*arbutil.MessageIndex, error) {
v.reorgMutex.RLock()
defer v.reorgMutex.RUnlock()

wasmRoots := v.GetModuleRootsToValidate()
pos := v.lastValidationSent()
sendValidationUntil := v.validated() + arbutil.MessageIndex(v.config().ValidationSentLimit)
for pos <= sendValidationUntil {
if ctx.Err() != nil {
return nil, ctx.Err()
}
v.reorgMutex.RUnlock()
v.reorgMutex.RLock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after getting the reorgMutex, you need to read lastValidationSent() into pos again, becaus e if there was a reorg it could have changed that pointer.

if pos >= v.recordSent() {
log.Trace("advanceValidations: nothing to validate", "pos", pos)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Debug

return nil, nil
}
validationStatus, found := v.validations.Load(pos)
if !found {
return nil, fmt.Errorf("not found entry for pos %d", pos)
}
currentStatus := validationStatus.getStatus()
if currentStatus == RecordFailed {
// retry
log.Warn("Recording for validation failed, retrying..", "pos", pos)
return &pos, nil
}
for _, moduleRoot := range wasmRoots {
spawner := v.chosenValidator[moduleRoot]
Expand Down Expand Up @@ -931,7 +971,11 @@ validationsLoop:
nonBlockingTrigger(v.progressValidationsChan)
})
}
pos += 1
atomicStorePos(&v.lastValidationSentA, pos, validatorMsgCountLastValidationSentGauge)
log.Trace("validation sent", "pos", pos)
}
return nil, nil
}

func (v *BlockValidator) iterativeValidationProgress(ctx context.Context, ignored struct{}) time.Duration {
Expand All @@ -948,6 +992,20 @@ func (v *BlockValidator) iterativeValidationProgress(ctx context.Context, ignore
return v.config().ValidationPoll
}

func (v *BlockValidator) iterativeValidationSentProgress(ctx context.Context, ignored struct{}) time.Duration {
reorg, err := v.sendValidations(ctx)
if err != nil {
log.Error("error trying to send validation node", "err", err)
} else if reorg != nil {
err := v.Reorg(ctx, *reorg)
if err != nil {
log.Error("error trying to reorg validation", "pos", *reorg-1, "err", err)
v.possiblyFatal(err)
}
}
return v.config().ValidationPoll
}

var ErrValidationCanceled = errors.New("validation of block cancelled")

func (v *BlockValidator) writeLastValidated(gs validator.GoGlobalState, wasmRoots []common.Hash) error {
Expand Down Expand Up @@ -1059,6 +1117,10 @@ func (v *BlockValidator) UpdateLatestStaked(count arbutil.MessageIndex, globalSt
if v.recordSentA.Load() < countUint64 {
v.recordSentA.Store(countUint64)
}

if v.lastValidationSentA.Load() < countUint64 {
v.lastValidationSentA.Store(countUint64)
}
// #nosec G115
v.validatedA.Store(countUint64)
v.valLoopPos = count
Expand Down Expand Up @@ -1127,6 +1189,11 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex)
if v.recordSentA.Load() > countUint64 {
v.recordSentA.Store(countUint64)
}
if v.lastValidationSentA.Load() > countUint64 {
v.lastValidationSentA.Store(countUint64)
// #nosec G115
validatorMsgCountLastValidationSentGauge.Update(int64(countUint64))
}
if v.validatedA.Load() > countUint64 {
v.validatedA.Store(countUint64)
// #nosec G115
Expand Down Expand Up @@ -1321,6 +1388,7 @@ func (v *BlockValidator) checkValidatedGSCaughtUp() (bool, error) {
atomicStorePos(&v.createdA, count, validatorMsgCountCreatedGauge)
atomicStorePos(&v.recordSentA, count, validatorMsgCountRecordSentGauge)
atomicStorePos(&v.validatedA, count, validatorMsgCountValidatedGauge)
atomicStorePos(&v.lastValidationSentA, count, validatorMsgCountLastValidationSentGauge)
// #nosec G115
validatorMsgCountValidatedGauge.Update(int64(count))
v.chainCaughtUp = true
Expand Down Expand Up @@ -1354,6 +1422,10 @@ func (v *BlockValidator) LaunchWorkthreadsWhenCaughtUp(ctx context.Context) {
if err != nil {
v.possiblyFatal(err)
}
err = stopwaiter.CallIterativelyWith[struct{}](&v.StopWaiterSafe, v.iterativeValidationSentProgress, v.progressValidationsChan)
if err != nil {
v.possiblyFatal(err)
}
err = stopwaiter.CallIterativelyWith[struct{}](&v.StopWaiterSafe, v.iterativeValidationProgress, v.progressValidationsChan)
if err != nil {
v.possiblyFatal(err)
Expand Down
Loading