Skip to content

Commit

Permalink
Merge pull request #2505 from OffchainLabs/redis_val_fixes
Browse files Browse the repository at this point in the history
validation spawner only recieves one validation binary
  • Loading branch information
tsahee authored Jul 29, 2024
2 parents 5762e64 + dfac7c5 commit 1b9f7ab
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 65 deletions.
10 changes: 9 additions & 1 deletion arbcompress/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ package arbcompress
#include "arbitrator.h"
*/
import "C"
import "fmt"
import (
"errors"
"fmt"
)

type u8 = C.uint8_t
type u32 = C.uint32_t
Expand Down Expand Up @@ -44,6 +47,8 @@ func Compress(input []byte, level uint32, dictionary Dictionary) ([]byte, error)
return output, nil
}

var ErrOutputWontFit = errors.New("output won't fit in maxsize")

func Decompress(input []byte, maxSize int) ([]byte, error) {
return DecompressWithDictionary(input, maxSize, EmptyDictionary)
}
Expand All @@ -54,6 +59,9 @@ func DecompressWithDictionary(input []byte, maxSize int, dictionary Dictionary)
inbuf := sliceToBuffer(input)

status := C.brotli_decompress(inbuf, outbuf, C.Dictionary(dictionary))
if status == C.BrotliStatus_NeedsMoreOutput {
return nil, ErrOutputWontFit
}
if status != C.BrotliStatus_Success {
return nil, fmt.Errorf("failed decompression: %d", status)
}
Expand Down
54 changes: 40 additions & 14 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ var (
validatorPendingValidationsGauge = metrics.NewRegisteredGauge("arb/validator/validations/pending", nil)
validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil)
validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil)
validatorProfileWaitToRecordHist = metrics.NewRegisteredHistogram("arb/validator/profile/wait_to_record", nil, metrics.NewBoundedHistogramSample())
validatorProfileRecordingHist = metrics.NewRegisteredHistogram("arb/validator/profile/recording", nil, metrics.NewBoundedHistogramSample())
validatorProfileWaitToLaunchHist = metrics.NewRegisteredHistogram("arb/validator/profile/wait_to_launch", nil, metrics.NewBoundedHistogramSample())
validatorProfileLaunchingHist = metrics.NewRegisteredHistogram("arb/validator/profile/launching", nil, metrics.NewBoundedHistogramSample())
validatorProfileRunningHist = metrics.NewRegisteredHistogram("arb/validator/profile/running", nil, metrics.NewBoundedHistogramSample())
validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil)
validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil)
validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil)
Expand Down Expand Up @@ -210,10 +215,11 @@ const (
)

type validationStatus struct {
Status atomic.Uint32 // atomic: value is one of validationStatus*
Cancel func() // non-atomic: only read/written to with reorg mutex
Entry *validationEntry // non-atomic: only read if Status >= validationStatusPrepared
Runs []validator.ValidationRun // if status >= ValidationSent
Status atomic.Uint32 // atomic: value is one of validationStatus*
Cancel func() // non-atomic: only read/written to with reorg mutex
Entry *validationEntry // non-atomic: only read if Status >= validationStatusPrepared
Runs []validator.ValidationRun // if status >= ValidationSent
profileTS int64 // time-stamp for profiling
}

func (s *validationStatus) getStatus() valStatusField {
Expand All @@ -225,6 +231,13 @@ func (s *validationStatus) replaceStatus(old, new valStatusField) bool {
return s.Status.CompareAndSwap(uint32(old), uint32(new))
}

// gets how many miliseconds last step took, and starts measuring a new step
func (s *validationStatus) profileStep() int64 {
start := s.profileTS
s.profileTS = time.Now().UnixMilli()
return s.profileTS - start
}

func NewBlockValidator(
statelessBlockValidator *StatelessBlockValidator,
inbox InboxTrackerInterface,
Expand Down Expand Up @@ -447,6 +460,8 @@ func (v *BlockValidator) sendRecord(s *validationStatus) error {
if !s.replaceStatus(Created, RecordSent) {
return fmt.Errorf("failed status check for send record. Status: %v", s.getStatus())
}

validatorProfileWaitToRecordHist.Update(s.profileStep())
v.LaunchThread(func(ctx context.Context) {
err := v.ValidationEntryRecord(ctx, s.Entry)
if ctx.Err() != nil {
Expand All @@ -457,6 +472,7 @@ func (v *BlockValidator) sendRecord(s *validationStatus) error {
log.Error("Error while recording", "err", err, "status", s.getStatus())
return
}
validatorProfileRecordingHist.Update(s.profileStep())
if !s.replaceStatus(RecordSent, Prepared) {
log.Error("Fault trying to update validation with recording", "entry", s.Entry, "status", s.getStatus())
return
Expand All @@ -468,7 +484,7 @@ func (v *BlockValidator) sendRecord(s *validationStatus) error {

//nolint:gosec
func (v *BlockValidator) writeToFile(validationEntry *validationEntry, moduleRoot common.Hash) error {
input, err := validationEntry.ToInput()
input, err := validationEntry.ToInput([]string{"wavm"})
if err != nil {
return err
}
Expand Down Expand Up @@ -585,7 +601,8 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e
return false, err
}
status := &validationStatus{
Entry: entry,
Entry: entry,
profileTS: time.Now().UnixMilli(),
}
status.Status.Store(uint32(Created))
v.validations.Store(pos, status)
Expand Down Expand Up @@ -792,12 +809,13 @@ validationsLoop:
continue
}
for _, moduleRoot := range wasmRoots {
if v.chosenValidator[moduleRoot] == nil {
spawner := v.chosenValidator[moduleRoot]
if spawner == nil {
notFoundErr := fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot)
v.possiblyFatal(notFoundErr)
return nil, notFoundErr
}
if v.chosenValidator[moduleRoot].Room() == 0 {
if spawner.Room() == 0 {
log.Trace("advanceValidations: no more room", "moduleRoot", moduleRoot)
return nil, nil
}
Expand All @@ -807,28 +825,35 @@ validationsLoop:
return nil, nil
}
if currentStatus == Prepared {
input, err := validationStatus.Entry.ToInput()
if err != nil && ctx.Err() == nil {
v.possiblyFatal(fmt.Errorf("%w: error preparing validation", err))
continue
}
replaced := validationStatus.replaceStatus(Prepared, SendingValidation)
if !replaced {
v.possiblyFatal(errors.New("failed to set SendingValidation status"))
}
validatorProfileWaitToLaunchHist.Update(validationStatus.profileStep())
validatorPendingValidationsGauge.Inc(1)
var runs []validator.ValidationRun
for _, moduleRoot := range wasmRoots {
run := v.chosenValidator[moduleRoot].Launch(input, moduleRoot)
spawner := v.chosenValidator[moduleRoot]
input, err := validationStatus.Entry.ToInput(spawner.StylusArchs())
if err != nil && ctx.Err() == nil {
v.possiblyFatal(fmt.Errorf("%w: error preparing validation", err))
continue
}
if ctx.Err() != nil {
return nil, ctx.Err()
}
run := spawner.Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot)
runs = append(runs, run)
}
validatorProfileLaunchingHist.Update(validationStatus.profileStep())
validationCtx, cancel := context.WithCancel(ctx)
validationStatus.Runs = runs
validationStatus.Cancel = cancel
v.LaunchUntrackedThread(func() {
defer validatorPendingValidationsGauge.Dec(1)
defer cancel()
startTsMilli := validationStatus.profileTS
replaced = validationStatus.replaceStatus(SendingValidation, ValidationSent)
if !replaced {
v.possiblyFatal(errors.New("failed to set status to ValidationSent"))
Expand All @@ -842,6 +867,7 @@ validationsLoop:
return
}
}
validatorProfileRunningHist.Update(time.Now().UnixMilli() - startTsMilli)
nonBlockingTrigger(v.progressValidationsChan)
})
}
Expand Down
2 changes: 1 addition & 1 deletion staker/challenge_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (m *ChallengeManager) createExecutionBackend(ctx context.Context, step uint
if err != nil {
return fmt.Errorf("error creating validation entry for challenge %v msg %v for execution challenge: %w", m.challengeIndex, initialCount, err)
}
input, err := entry.ToInput()
input, err := entry.ToInput([]string{"wavm"})
if err != nil {
return fmt.Errorf("error getting validation entry input of challenge %v msg %v: %w", m.challengeIndex, initialCount, err)
}
Expand Down
36 changes: 28 additions & 8 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net/url"
"runtime"
"testing"

"github.com/offchainlabs/nitro/arbstate/daprovider"
Expand Down Expand Up @@ -134,21 +135,36 @@ type validationEntry struct {
DelayedMsg []byte
}

func (e *validationEntry) ToInput() (*validator.ValidationInput, error) {
func (e *validationEntry) ToInput(stylusArchs []string) (*validator.ValidationInput, error) {
if e.Stage != Ready {
return nil, errors.New("cannot create input from non-ready entry")
}
return &validator.ValidationInput{
res := validator.ValidationInput{
Id: uint64(e.Pos),
HasDelayedMsg: e.HasDelayedMsg,
DelayedMsgNr: e.DelayedMsgNr,
Preimages: e.Preimages,
UserWasms: e.UserWasms,
UserWasms: make(map[string]map[common.Hash][]byte, len(e.UserWasms)),
BatchInfo: e.BatchInfo,
DelayedMsg: e.DelayedMsg,
StartState: e.Start,
DebugChain: e.ChainConfig.DebugMode(),
}, nil
}
for _, stylusArch := range stylusArchs {
res.UserWasms[stylusArch] = make(map[common.Hash][]byte)
}
for hash, info := range e.UserWasms {
for _, stylusArch := range stylusArchs {
if stylusArch == "wavm" {
res.UserWasms[stylusArch][hash] = info.Module
} else if stylusArch == runtime.GOARCH {
res.UserWasms[stylusArch][hash] = info.Asm
} else {
return nil, fmt.Errorf("stylusArch not supported by block validator: %v", stylusArch)
}
}
}
return &res, nil
}

func newValidationEntry(
Expand Down Expand Up @@ -373,21 +389,25 @@ func (v *StatelessBlockValidator) ValidateResult(
if err != nil {
return false, nil, err
}
input, err := entry.ToInput()
if err != nil {
return false, nil, err
}
var run validator.ValidationRun
if !useExec {
if v.redisValidator != nil {
if validator.SpawnerSupportsModule(v.redisValidator, moduleRoot) {
input, err := entry.ToInput(v.redisValidator.StylusArchs())
if err != nil {
return false, nil, err
}
run = v.redisValidator.Launch(input, moduleRoot)
}
}
}
if run == nil {
for _, spawner := range v.execSpawners {
if validator.SpawnerSupportsModule(spawner, moduleRoot) {
input, err := entry.ToInput(spawner.StylusArchs())
if err != nil {
return false, nil, err
}
run = spawner.Launch(input, moduleRoot)
break
}
Expand Down
4 changes: 4 additions & 0 deletions system_tests/validation_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (s *mockSpawner) WasmModuleRoots() ([]common.Hash, error) {
return mockWasmModuleRoots, nil
}

func (s *mockSpawner) StylusArchs() []string {
return []string{"mock"}
}

func (s *mockSpawner) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun {
run := &mockValRun{
Promise: containers.NewPromise[validator.GoGlobalState](nil),
Expand Down
8 changes: 8 additions & 0 deletions validator/client/redis/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type ValidationClientConfig struct {
StreamPrefix string `koanf:"stream-prefix"`
Room int32 `koanf:"room"`
RedisURL string `koanf:"redis-url"`
StylusArchs []string `koanf:"stylus-archs"`
ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"`
CreateStreams bool `koanf:"create-streams"`
}
Expand All @@ -35,6 +36,7 @@ var DefaultValidationClientConfig = ValidationClientConfig{
Name: "redis validation client",
Room: 2,
RedisURL: "",
StylusArchs: []string{"wavm"},
ProducerConfig: pubsub.DefaultProducerConfig,
CreateStreams: true,
}
Expand All @@ -44,6 +46,7 @@ var TestValidationClientConfig = ValidationClientConfig{
Room: 2,
RedisURL: "",
StreamPrefix: "test-",
StylusArchs: []string{"wavm"},
ProducerConfig: pubsub.TestProducerConfig,
CreateStreams: false,
}
Expand All @@ -53,6 +56,7 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room")
f.String(prefix+".redis-url", DefaultValidationClientConfig.RedisURL, "redis url")
f.String(prefix+".stream-prefix", DefaultValidationClientConfig.StreamPrefix, "prefix for stream name")
f.StringSlice(prefix+".stylus-archs", DefaultValidationClientConfig.StylusArchs, "archs required for stylus workers")
pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f)
f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist")
}
Expand Down Expand Up @@ -148,6 +152,10 @@ func (c *ValidationClient) Name() string {
return c.config.Name
}

func (c *ValidationClient) StylusArchs() []string {
return c.config.StylusArchs
}

func (c *ValidationClient) Room() int {
return int(c.room.Load())
}
Loading

0 comments on commit 1b9f7ab

Please sign in to comment.