Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validation spawner only recieves one validation binary #2505

Merged
merged 12 commits into from
Jul 29, 2024
57 changes: 38 additions & 19 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ import (
)

var (
validatorPendingValidationsGauge = metrics.NewRegisteredGauge("arb/validator/validations/pending", nil)
validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil)
validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil)
validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil)
validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil)
validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil)
validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil)
validatorPendingValidationsGauge = metrics.NewRegisteredGauge("arb/validator/validations/pending", nil)
validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil)
validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil)
validatorValidationWaitToRecordHist = metrics.NewRegisteredHistogram("arb/validator/validations/waitToRecord", nil, metrics.NewBoundedHistogramSample())
validatorValidationRecordingHist = metrics.NewRegisteredHistogram("arb/validator/validations/recording", nil, metrics.NewBoundedHistogramSample())
validatorValidationWaitToLaunchHist = metrics.NewRegisteredHistogram("arb/validator/validations/waitToRun", nil, metrics.NewBoundedHistogramSample())
validatorValidationLaunchHist = metrics.NewRegisteredHistogram("arb/validator/validations/waitToRun", nil, metrics.NewBoundedHistogramSample())
Copy link
Contributor

@magicxyyz magicxyyz Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here the metric name needs to be fixed
How about naming it .../validations/launch and previous one .../validations/waitToLaunch to match the variable names?
Alternatively, we could rename the metrics variables to match the names

validatorValidationRunningHist = metrics.NewRegisteredHistogram("arb/validator/validations/running", nil, metrics.NewBoundedHistogramSample())
validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil)
validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil)
validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil)
validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil)
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
)

type BlockValidator struct {
Expand Down Expand Up @@ -210,10 +215,11 @@ const (
)

type validationStatus struct {
Status atomic.Uint32 // atomic: value is one of validationStatus*
Cancel func() // non-atomic: only read/written to with reorg mutex
Entry *validationEntry // non-atomic: only read if Status >= validationStatusPrepared
Runs []validator.ValidationRun // if status >= ValidationSent
Status atomic.Uint32 // atomic: value is one of validationStatus*
Cancel func() // non-atomic: only read/written to with reorg mutex
Entry *validationEntry // non-atomic: only read if Status >= validationStatusPrepared
Runs []validator.ValidationRun // if status >= ValidationSent
tsMilli int64
}

func (s *validationStatus) getStatus() valStatusField {
Expand All @@ -225,6 +231,12 @@ func (s *validationStatus) replaceStatus(old, new valStatusField) bool {
return s.Status.CompareAndSwap(uint32(old), uint32(new))
}

func (s *validationStatus) timeStampInterval() int64 {
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
start := s.tsMilli
s.tsMilli = time.Now().UnixMilli()
return s.tsMilli - start
}

func NewBlockValidator(
statelessBlockValidator *StatelessBlockValidator,
inbox InboxTrackerInterface,
Expand Down Expand Up @@ -447,6 +459,8 @@ func (v *BlockValidator) sendRecord(s *validationStatus) error {
if !s.replaceStatus(Created, RecordSent) {
return fmt.Errorf("failed status check for send record. Status: %v", s.getStatus())
}

validatorValidationWaitToRecordHist.Update(s.timeStampInterval())
v.LaunchThread(func(ctx context.Context) {
err := v.ValidationEntryRecord(ctx, s.Entry)
if ctx.Err() != nil {
Expand All @@ -457,18 +471,18 @@ func (v *BlockValidator) sendRecord(s *validationStatus) error {
log.Error("Error while recording", "err", err, "status", s.getStatus())
return
}
validatorValidationRecordingHist.Update(s.timeStampInterval())
if !s.replaceStatus(RecordSent, Prepared) {
log.Error("Fault trying to update validation with recording", "entry", s.Entry, "status", s.getStatus())
return
}
nonBlockingTrigger(v.progressValidationsChan)
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
})
return nil
}

//nolint:gosec
func (v *BlockValidator) writeToFile(validationEntry *validationEntry, moduleRoot common.Hash) error {
input, err := validationEntry.ToInput()
input, err := validationEntry.ToInput([]string{"wavm"})
if err != nil {
return err
}
Expand Down Expand Up @@ -585,7 +599,8 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e
return false, err
}
status := &validationStatus{
Entry: entry,
Entry: entry,
tsMilli: time.Now().UnixMilli(),
}
status.Status.Store(uint32(Created))
v.validations.Store(pos, status)
Expand Down Expand Up @@ -807,28 +822,31 @@ validationsLoop:
return nil, nil
}
if currentStatus == Prepared {
input, err := validationStatus.Entry.ToInput()
if err != nil && ctx.Err() == nil {
v.possiblyFatal(fmt.Errorf("%w: error preparing validation", err))
continue
}
replaced := validationStatus.replaceStatus(Prepared, SendingValidation)
if !replaced {
v.possiblyFatal(errors.New("failed to set SendingValidation status"))
}
validatorValidationWaitToLaunchHist.Update(validationStatus.timeStampInterval())
validatorPendingValidationsGauge.Inc(1)
var runs []validator.ValidationRun
for _, moduleRoot := range wasmRoots {
input, err := validationStatus.Entry.ToInput(v.chosenValidator[moduleRoot].StylusArchs())
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
if err != nil && ctx.Err() == nil {
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
v.possiblyFatal(fmt.Errorf("%w: error preparing validation", err))
continue
}
run := v.chosenValidator[moduleRoot].Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot)
runs = append(runs, run)
}
validatorValidationLaunchHist.Update(validationStatus.timeStampInterval())
validationCtx, cancel := context.WithCancel(ctx)
validationStatus.Runs = runs
validationStatus.Cancel = cancel
v.LaunchUntrackedThread(func() {
defer validatorPendingValidationsGauge.Dec(1)
defer cancel()
startTsMilli := validationStatus.tsMilli
replaced = validationStatus.replaceStatus(SendingValidation, ValidationSent)
if !replaced {
v.possiblyFatal(errors.New("failed to set status to ValidationSent"))
Expand All @@ -842,6 +860,7 @@ validationsLoop:
return
}
}
validatorValidationRunningHist.Update(time.Now().UnixMilli() - startTsMilli)
nonBlockingTrigger(v.progressValidationsChan)
})
}
Expand Down
2 changes: 1 addition & 1 deletion staker/challenge_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (m *ChallengeManager) createExecutionBackend(ctx context.Context, step uint
if err != nil {
return fmt.Errorf("error creating validation entry for challenge %v msg %v for execution challenge: %w", m.challengeIndex, initialCount, err)
}
input, err := entry.ToInput()
input, err := entry.ToInput([]string{"wavm"})
if err != nil {
return fmt.Errorf("error getting validation entry input of challenge %v msg %v: %w", m.challengeIndex, initialCount, err)
}
Expand Down
36 changes: 28 additions & 8 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net/url"
"runtime"
"testing"

"github.com/offchainlabs/nitro/arbstate/daprovider"
Expand Down Expand Up @@ -134,21 +135,36 @@ type validationEntry struct {
DelayedMsg []byte
}

func (e *validationEntry) ToInput() (*validator.ValidationInput, error) {
func (e *validationEntry) ToInput(stylusArchs []string) (*validator.ValidationInput, error) {
if e.Stage != Ready {
return nil, errors.New("cannot create input from non-ready entry")
}
return &validator.ValidationInput{
res := validator.ValidationInput{
Id: uint64(e.Pos),
HasDelayedMsg: e.HasDelayedMsg,
DelayedMsgNr: e.DelayedMsgNr,
Preimages: e.Preimages,
UserWasms: e.UserWasms,
UserWasms: make(map[string]map[common.Hash][]byte, len(e.UserWasms)),
BatchInfo: e.BatchInfo,
DelayedMsg: e.DelayedMsg,
StartState: e.Start,
DebugChain: e.ChainConfig.DebugMode(),
}, nil
}
for _, stylusArch := range stylusArchs {
res.UserWasms[stylusArch] = make(map[common.Hash][]byte)
}
for hash, info := range e.UserWasms {
for _, stylusArch := range stylusArchs {
if stylusArch == "wavm" {
res.UserWasms[stylusArch][hash] = info.Module
} else if stylusArch == runtime.GOARCH {
res.UserWasms[stylusArch][hash] = info.Asm
} else {
return nil, fmt.Errorf("stylusArch not supported by block validator: %v", stylusArch)
}
}
}
return &res, nil
}

func newValidationEntry(
Expand Down Expand Up @@ -373,21 +389,25 @@ func (v *StatelessBlockValidator) ValidateResult(
if err != nil {
return false, nil, err
}
input, err := entry.ToInput()
if err != nil {
return false, nil, err
}
var run validator.ValidationRun
if !useExec {
if v.redisValidator != nil {
if validator.SpawnerSupportsModule(v.redisValidator, moduleRoot) {
input, err := entry.ToInput(v.redisValidator.StylusArchs())
if err != nil {
return false, nil, err
}
run = v.redisValidator.Launch(input, moduleRoot)
}
}
}
if run == nil {
for _, spawner := range v.execSpawners {
if validator.SpawnerSupportsModule(spawner, moduleRoot) {
input, err := entry.ToInput(spawner.StylusArchs())
if err != nil {
return false, nil, err
}
run = spawner.Launch(input, moduleRoot)
break
}
Expand Down
4 changes: 4 additions & 0 deletions system_tests/validation_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (s *mockSpawner) WasmModuleRoots() ([]common.Hash, error) {
return mockWasmModuleRoots, nil
}

func (s *mockSpawner) StylusArchs() []string {
return []string{"mock"}
}

func (s *mockSpawner) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun {
run := &mockValRun{
Promise: containers.NewPromise[validator.GoGlobalState](nil),
Expand Down
8 changes: 8 additions & 0 deletions validator/client/redis/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type ValidationClientConfig struct {
StreamPrefix string `koanf:"stream-prefix"`
Room int32 `koanf:"room"`
RedisURL string `koanf:"redis-url"`
StylusArchs []string `koanf:"stylus-archs"`
ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"`
CreateStreams bool `koanf:"create-streams"`
}
Expand All @@ -35,6 +36,7 @@ var DefaultValidationClientConfig = ValidationClientConfig{
Name: "redis validation client",
Room: 2,
RedisURL: "",
StylusArchs: []string{"wavm"},
ProducerConfig: pubsub.DefaultProducerConfig,
CreateStreams: true,
}
Expand All @@ -44,6 +46,7 @@ var TestValidationClientConfig = ValidationClientConfig{
Room: 2,
RedisURL: "",
StreamPrefix: "test-",
StylusArchs: []string{"wavm"},
ProducerConfig: pubsub.TestProducerConfig,
CreateStreams: false,
}
Expand All @@ -53,6 +56,7 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room")
f.String(prefix+".redis-url", DefaultValidationClientConfig.RedisURL, "redis url")
f.String(prefix+".stream-prefix", DefaultValidationClientConfig.StreamPrefix, "prefix for stream name")
f.StringSlice(prefix+".stylus-archs", DefaultValidationClientConfig.StylusArchs, "archs required for stylus workers")
pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f)
f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist")
}
Expand Down Expand Up @@ -148,6 +152,10 @@ func (c *ValidationClient) Name() string {
return c.config.Name
}

func (c *ValidationClient) StylusArchs() []string {
return c.config.StylusArchs
}

func (c *ValidationClient) Room() int {
return int(c.room.Load())
}
30 changes: 27 additions & 3 deletions validator/client/validation_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"runtime"
"sync/atomic"
"time"

Expand All @@ -29,13 +30,16 @@ type ValidationClient struct {
stopwaiter.StopWaiter
client *rpcclient.RpcClient
name string
stylusArchs []string
room atomic.Int32
wasmModuleRoots []common.Hash
}

func NewValidationClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ValidationClient {
return &ValidationClient{
client: rpcclient.NewRpcClient(config, stack),
client: rpcclient.NewRpcClient(config, stack),
name: "not started",
stylusArchs: []string{"not started"},
}
}

Expand Down Expand Up @@ -64,15 +68,27 @@ func (c *ValidationClient) Start(ctx_in context.Context) error {
if len(name) == 0 {
return errors.New("couldn't read name from server")
}
var stylusArchs []string
if err := c.client.CallContext(ctx, &stylusArchs, server_api.Namespace+"_stylusArchs"); err != nil {
return err
}
if len(stylusArchs) == 0 {
return fmt.Errorf("could not read stylus archs from validation server")
}
for _, stylusArch := range stylusArchs {
if stylusArch != "wavm" && stylusArch != runtime.GOARCH && stylusArch != "mock" {
return fmt.Errorf("unsupported stylus architecture: %v", stylusArch)
}
}
var moduleRoots []common.Hash
if err := c.client.CallContext(c.GetContext(), &moduleRoots, server_api.Namespace+"_wasmModuleRoots"); err != nil {
if err := c.client.CallContext(ctx, &moduleRoots, server_api.Namespace+"_wasmModuleRoots"); err != nil {
return err
}
if len(moduleRoots) == 0 {
return fmt.Errorf("server reported no wasmModuleRoots")
}
var room int
if err := c.client.CallContext(c.GetContext(), &room, server_api.Namespace+"_room"); err != nil {
if err := c.client.CallContext(ctx, &room, server_api.Namespace+"_room"); err != nil {
return err
}
if room < 2 {
Expand All @@ -84,6 +100,7 @@ func (c *ValidationClient) Start(ctx_in context.Context) error {
c.room.Store(int32(room))
c.wasmModuleRoots = moduleRoots
c.name = name
c.stylusArchs = stylusArchs
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand All @@ -94,6 +111,13 @@ func (c *ValidationClient) WasmModuleRoots() ([]common.Hash, error) {
return nil, errors.New("not started")
}

func (c *ValidationClient) StylusArchs() []string {
if c.Started() {
return c.stylusArchs
}
return []string{"not started"}
}

func (c *ValidationClient) Stop() {
c.StopWaiter.StopOnly()
if c.client != nil {
Expand Down
1 change: 1 addition & 0 deletions validator/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type ValidationSpawner interface {
Start(context.Context) error
Stop()
Name() string
StylusArchs() []string
Room() int
}

Expand Down
Loading
Loading