From 52df895e99c0988a6cadb879ddd271bcb8bb93f7 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Thu, 18 Jul 2024 17:50:14 -0600 Subject: [PATCH 1/9] validation spawner only recieves one validation binary --- staker/block_validator.go | 12 +++---- staker/challenge_manager.go | 2 +- staker/stateless_block_validator.go | 32 ++++++++++++++----- system_tests/validation_mock_test.go | 4 +++ validator/client/redis/producer.go | 8 +++++ validator/client/validation_client.go | 25 +++++++++++++-- validator/interface.go | 1 + validator/server_api/json.go | 38 ++++++++++------------- validator/server_arb/validator_spawner.go | 11 +++++-- validator/server_jit/jit_machine.go | 8 +++-- validator/server_jit/spawner.go | 4 +++ validator/validation_entry.go | 4 +-- validator/valnode/validation_api.go | 4 +++ 13 files changed, 107 insertions(+), 46 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index bfb7c24ac6..04851b700f 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -468,7 +468,7 @@ func (v *BlockValidator) sendRecord(s *validationStatus) error { //nolint:gosec func (v *BlockValidator) writeToFile(validationEntry *validationEntry, moduleRoot common.Hash) error { - input, err := validationEntry.ToInput() + input, err := validationEntry.ToInput("wavm") if err != nil { return err } @@ -807,11 +807,6 @@ validationsLoop: return nil, nil } if currentStatus == Prepared { - input, err := validationStatus.Entry.ToInput() - if err != nil && ctx.Err() == nil { - v.possiblyFatal(fmt.Errorf("%w: error preparing validation", err)) - continue - } replaced := validationStatus.replaceStatus(Prepared, SendingValidation) if !replaced { v.possiblyFatal(errors.New("failed to set SendingValidation status")) @@ -819,6 +814,11 @@ validationsLoop: validatorPendingValidationsGauge.Inc(1) var runs []validator.ValidationRun for _, moduleRoot := range wasmRoots { + input, err := validationStatus.Entry.ToInput(v.chosenValidator[moduleRoot].StylusArch()) + if err != nil && ctx.Err() == nil { + v.possiblyFatal(fmt.Errorf("%w: error preparing validation", err)) + continue + } run := v.chosenValidator[moduleRoot].Launch(input, moduleRoot) log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot) runs = append(runs, run) diff --git a/staker/challenge_manager.go b/staker/challenge_manager.go index 22897e3c1d..f50b85064c 100644 --- a/staker/challenge_manager.go +++ b/staker/challenge_manager.go @@ -467,7 +467,7 @@ func (m *ChallengeManager) createExecutionBackend(ctx context.Context, step uint if err != nil { return fmt.Errorf("error creating validation entry for challenge %v msg %v for execution challenge: %w", m.challengeIndex, initialCount, err) } - input, err := entry.ToInput() + input, err := entry.ToInput("wavm") if err != nil { return fmt.Errorf("error getting validation entry input of challenge %v msg %v: %w", m.challengeIndex, initialCount, err) } diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index ec235c4bf5..7daa1ab872 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "net/url" + "runtime" "testing" "github.com/offchainlabs/nitro/arbstate/daprovider" @@ -134,21 +135,32 @@ type validationEntry struct { DelayedMsg []byte } -func (e *validationEntry) ToInput() (*validator.ValidationInput, error) { +func (e *validationEntry) ToInput(stylusArch string) (*validator.ValidationInput, error) { if e.Stage != Ready { return nil, errors.New("cannot create input from non-ready entry") } - return &validator.ValidationInput{ + res := validator.ValidationInput{ Id: uint64(e.Pos), HasDelayedMsg: e.HasDelayedMsg, DelayedMsgNr: e.DelayedMsgNr, Preimages: e.Preimages, - UserWasms: e.UserWasms, + StylusArch: stylusArch, + UserWasms: make(map[common.Hash][]byte, len(e.UserWasms)), BatchInfo: e.BatchInfo, DelayedMsg: e.DelayedMsg, StartState: e.Start, DebugChain: e.ChainConfig.DebugMode(), - }, nil + } + for hash, info := range e.UserWasms { + if stylusArch == "wavm" { + res.UserWasms[hash] = info.Module + } else if stylusArch == runtime.GOARCH { + res.UserWasms[hash] = info.Asm + } else { + return nil, fmt.Errorf("stylusArch not supported by block validator: %v", stylusArch) + } + } + return &res, nil } func newValidationEntry( @@ -373,14 +385,14 @@ func (v *StatelessBlockValidator) ValidateResult( if err != nil { return false, nil, err } - input, err := entry.ToInput() - if err != nil { - return false, nil, err - } var run validator.ValidationRun if !useExec { if v.redisValidator != nil { if validator.SpawnerSupportsModule(v.redisValidator, moduleRoot) { + input, err := entry.ToInput(v.redisValidator.StylusArch()) + if err != nil { + return false, nil, err + } run = v.redisValidator.Launch(input, moduleRoot) } } @@ -388,6 +400,10 @@ func (v *StatelessBlockValidator) ValidateResult( if run == nil { for _, spawner := range v.execSpawners { if validator.SpawnerSupportsModule(spawner, moduleRoot) { + input, err := entry.ToInput(spawner.StylusArch()) + if err != nil { + return false, nil, err + } run = spawner.Launch(input, moduleRoot) break } diff --git a/system_tests/validation_mock_test.go b/system_tests/validation_mock_test.go index 1330f24882..a1d9f314cc 100644 --- a/system_tests/validation_mock_test.go +++ b/system_tests/validation_mock_test.go @@ -60,6 +60,10 @@ func (s *mockSpawner) WasmModuleRoots() ([]common.Hash, error) { return mockWasmModuleRoots, nil } +func (s *mockSpawner) StylusArch() string { + return "mock" +} + func (s *mockSpawner) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { run := &mockValRun{ Promise: containers.NewPromise[validator.GoGlobalState](nil), diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index 0adedc6784..4a81ae1fdf 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -23,6 +23,7 @@ type ValidationClientConfig struct { StreamPrefix string `koanf:"stream-prefix"` Room int32 `koanf:"room"` RedisURL string `koanf:"redis-url"` + StylusArch string `koanf:"stylus-arch"` ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"` CreateStreams bool `koanf:"create-streams"` } @@ -35,6 +36,7 @@ var DefaultValidationClientConfig = ValidationClientConfig{ Name: "redis validation client", Room: 2, RedisURL: "", + StylusArch: "wavm", ProducerConfig: pubsub.DefaultProducerConfig, CreateStreams: true, } @@ -44,6 +46,7 @@ var TestValidationClientConfig = ValidationClientConfig{ Room: 2, RedisURL: "", StreamPrefix: "test-", + StylusArch: "wavm", ProducerConfig: pubsub.TestProducerConfig, CreateStreams: false, } @@ -53,6 +56,7 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room") f.String(prefix+".redis-url", DefaultValidationClientConfig.RedisURL, "redis url") f.String(prefix+".stream-prefix", DefaultValidationClientConfig.StreamPrefix, "prefix for stream name") + f.String(prefix+".stylus-arch", DefaultValidationClientConfig.StylusArch, "arch for stylus workers") pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f) f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist") } @@ -148,6 +152,10 @@ func (c *ValidationClient) Name() string { return c.config.Name } +func (c *ValidationClient) StylusArch() string { + return c.config.StylusArch +} + func (c *ValidationClient) Room() int { return int(c.room.Load()) } diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index 79ecc6bdf4..610f3eb01f 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -8,6 +8,7 @@ import ( "encoding/base64" "errors" "fmt" + "runtime" "sync/atomic" "time" @@ -29,13 +30,16 @@ type ValidationClient struct { stopwaiter.StopWaiter client *rpcclient.RpcClient name string + stylusArch string room atomic.Int32 wasmModuleRoots []common.Hash } func NewValidationClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ValidationClient { return &ValidationClient{ - client: rpcclient.NewRpcClient(config, stack), + client: rpcclient.NewRpcClient(config, stack), + name: "not started", + stylusArch: "not started", } } @@ -64,15 +68,22 @@ func (c *ValidationClient) Start(ctx_in context.Context) error { if len(name) == 0 { return errors.New("couldn't read name from server") } + var stylusArch string + if err := c.client.CallContext(ctx, &stylusArch, server_api.Namespace+"_stylusArch"); err != nil { + return err + } + if stylusArch != "wavm" && stylusArch != runtime.GOARCH && stylusArch != "mock" { + return fmt.Errorf("unsupported stylus architecture: %v", stylusArch) + } var moduleRoots []common.Hash - if err := c.client.CallContext(c.GetContext(), &moduleRoots, server_api.Namespace+"_wasmModuleRoots"); err != nil { + if err := c.client.CallContext(ctx, &moduleRoots, server_api.Namespace+"_wasmModuleRoots"); err != nil { return err } if len(moduleRoots) == 0 { return fmt.Errorf("server reported no wasmModuleRoots") } var room int - if err := c.client.CallContext(c.GetContext(), &room, server_api.Namespace+"_room"); err != nil { + if err := c.client.CallContext(ctx, &room, server_api.Namespace+"_room"); err != nil { return err } if room < 2 { @@ -84,6 +95,7 @@ func (c *ValidationClient) Start(ctx_in context.Context) error { c.room.Store(int32(room)) c.wasmModuleRoots = moduleRoots c.name = name + c.stylusArch = stylusArch return nil } @@ -94,6 +106,13 @@ func (c *ValidationClient) WasmModuleRoots() ([]common.Hash, error) { return nil, errors.New("not started") } +func (c *ValidationClient) StylusArch() string { + if c.Started() { + return c.stylusArch + } + return "not started" +} + func (c *ValidationClient) Stop() { c.StopWaiter.StopOnly() if c.client != nil { diff --git a/validator/interface.go b/validator/interface.go index 91668a3771..d59e94c4a0 100644 --- a/validator/interface.go +++ b/validator/interface.go @@ -13,6 +13,7 @@ type ValidationSpawner interface { Start(context.Context) error Stop() Name() string + StylusArch() string Room() int } diff --git a/validator/server_api/json.go b/validator/server_api/json.go index dd646e1aa1..5de6999ef1 100644 --- a/validator/server_api/json.go +++ b/validator/server_api/json.go @@ -10,7 +10,7 @@ import ( "os" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/state" + "github.com/offchainlabs/nitro/arbcompress" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/util/jsonapi" @@ -62,7 +62,8 @@ type InputJSON struct { BatchInfo []BatchInfoJson DelayedMsgB64 string StartState validator.GoGlobalState - UserWasms map[common.Hash]UserWasmJson + StylusArch string + UserWasms map[common.Hash]string DebugChain bool } @@ -77,11 +78,6 @@ func (i *InputJSON) WriteToFile() error { return nil } -type UserWasmJson struct { - Module string - Asm string -} - type BatchInfoJson struct { Number uint64 DataB64 string @@ -99,19 +95,20 @@ func ValidationInputToJson(entry *validator.ValidationInput) *InputJSON { DelayedMsgB64: base64.StdEncoding.EncodeToString(entry.DelayedMsg), StartState: entry.StartState, PreimagesB64: jsonPreimagesMap, - UserWasms: make(map[common.Hash]UserWasmJson), + UserWasms: make(map[common.Hash]string), + StylusArch: entry.StylusArch, DebugChain: entry.DebugChain, } for _, binfo := range entry.BatchInfo { encData := base64.StdEncoding.EncodeToString(binfo.Data) res.BatchInfo = append(res.BatchInfo, BatchInfoJson{Number: binfo.Number, DataB64: encData}) } - for moduleHash, info := range entry.UserWasms { - encWasm := UserWasmJson{ - Asm: base64.StdEncoding.EncodeToString(info.Asm), - Module: base64.StdEncoding.EncodeToString(info.Module), + for moduleHash, data := range entry.UserWasms { + compressed, err := arbcompress.CompressWell(data) + if err != nil { + entry.StylusArch = "compressError:" + err.Error() } - res.UserWasms[moduleHash] = encWasm + res.UserWasms[moduleHash] = base64.StdEncoding.EncodeToString(compressed) } return res } @@ -127,7 +124,8 @@ func ValidationInputFromJson(entry *InputJSON) (*validator.ValidationInput, erro DelayedMsgNr: entry.DelayedMsgNr, StartState: entry.StartState, Preimages: preimages, - UserWasms: make(state.UserWasms), + StylusArch: entry.StylusArch, + UserWasms: make(map[common.Hash][]byte), DebugChain: entry.DebugChain, } delayed, err := base64.StdEncoding.DecodeString(entry.DelayedMsgB64) @@ -146,20 +144,16 @@ func ValidationInputFromJson(entry *InputJSON) (*validator.ValidationInput, erro } valInput.BatchInfo = append(valInput.BatchInfo, decInfo) } - for moduleHash, info := range entry.UserWasms { - asm, err := base64.StdEncoding.DecodeString(info.Asm) + for moduleHash, encoded := range entry.UserWasms { + decoded, err := base64.StdEncoding.DecodeString(encoded) if err != nil { return nil, err } - module, err := base64.StdEncoding.DecodeString(info.Module) + uncompressed, err := arbcompress.Decompress(decoded, 30000000) if err != nil { return nil, err } - decInfo := state.ActivatedWasm{ - Asm: asm, - Module: module, - } - valInput.UserWasms[moduleHash] = decInfo + valInput.UserWasms[moduleHash] = uncompressed } return valInput, nil } diff --git a/validator/server_arb/validator_spawner.go b/validator/server_arb/validator_spawner.go index 7b9293f7bd..73549af3ec 100644 --- a/validator/server_arb/validator_spawner.go +++ b/validator/server_arb/validator_spawner.go @@ -88,6 +88,10 @@ func (s *ArbitratorSpawner) WasmModuleRoots() ([]common.Hash, error) { return s.locator.ModuleRoots(), nil } +func (s *ArbitratorSpawner) StylusArch() string { + return "wavm" +} + func (s *ArbitratorSpawner) Name() string { return "arbitrator" } @@ -118,8 +122,11 @@ func (v *ArbitratorSpawner) loadEntryToMachine(ctx context.Context, entry *valid return fmt.Errorf("error while trying to add sequencer msg for proving: %w", err) } } - for moduleHash, info := range entry.UserWasms { - err = mach.AddUserWasm(moduleHash, info.Module) + if entry.StylusArch != "wavm" { + return fmt.Errorf("bad stylus arch loaded to machine. Expected wavm. Got: %s", entry.StylusArch) + } + for moduleHash, module := range entry.UserWasms { + err = mach.AddUserWasm(moduleHash, module) if err != nil { log.Error( "error adding user wasm for proving", diff --git a/validator/server_jit/jit_machine.go b/validator/server_jit/jit_machine.go index 1a3ccfa340..3a27c64e21 100644 --- a/validator/server_jit/jit_machine.go +++ b/validator/server_jit/jit_machine.go @@ -12,6 +12,7 @@ import ( "net" "os" "os/exec" + "runtime" "time" "github.com/ethereum/go-ethereum/common" @@ -212,15 +213,18 @@ func (machine *JitMachine) prove( } // send user wasms + if entry.StylusArch != runtime.GOARCH { + return state, fmt.Errorf("bad stylus arch for validation input. got: %v, expected: %v", entry.StylusArch, runtime.GOARCH) + } userWasms := entry.UserWasms if err := writeUint32(uint32(len(userWasms))); err != nil { return state, err } - for moduleHash, info := range userWasms { + for moduleHash, program := range userWasms { if err := writeExact(moduleHash[:]); err != nil { return state, err } - if err := writeBytes(info.Asm); err != nil { + if err := writeBytes(program); err != nil { return state, err } } diff --git a/validator/server_jit/spawner.go b/validator/server_jit/spawner.go index eda74b2911..dd9f3bbfc1 100644 --- a/validator/server_jit/spawner.go +++ b/validator/server_jit/spawner.go @@ -71,6 +71,10 @@ func (v *JitSpawner) WasmModuleRoots() ([]common.Hash, error) { return v.locator.ModuleRoots(), nil } +func (v *JitSpawner) StylusArch() string { + return runtime.GOARCH +} + func (v *JitSpawner) execute( ctx context.Context, entry *validator.ValidationInput, moduleRoot common.Hash, ) (validator.GoGlobalState, error) { diff --git a/validator/validation_entry.go b/validator/validation_entry.go index 446f84ca62..3c0f28eb37 100644 --- a/validator/validation_entry.go +++ b/validator/validation_entry.go @@ -2,7 +2,6 @@ package validator import ( "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/state" "github.com/offchainlabs/nitro/arbutil" ) @@ -17,7 +16,8 @@ type ValidationInput struct { HasDelayedMsg bool DelayedMsgNr uint64 Preimages map[arbutil.PreimageType]map[common.Hash][]byte - UserWasms state.UserWasms + StylusArch string + UserWasms map[common.Hash][]byte BatchInfo []BatchInfo DelayedMsg []byte StartState GoGlobalState diff --git a/validator/valnode/validation_api.go b/validator/valnode/validation_api.go index 3299366821..b8787f4f02 100644 --- a/validator/valnode/validation_api.go +++ b/validator/valnode/validation_api.go @@ -44,6 +44,10 @@ func (a *ValidationServerAPI) WasmModuleRoots() ([]common.Hash, error) { return a.spawner.WasmModuleRoots() } +func (a *ValidationServerAPI) StylusArch() (string, error) { + return a.spawner.StylusArch(), nil +} + func NewValidationServerAPI(spawner validator.ValidationSpawner) *ValidationServerAPI { return &ValidationServerAPI{spawner} } From c00b5176c3369cb0a18d1ee706b5cb1cb5421579 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Fri, 19 Jul 2024 09:23:56 -0600 Subject: [PATCH 2/9] allow multiple stylusArchs per validation spawner --- staker/block_validator.go | 4 +- staker/challenge_manager.go | 2 +- staker/stateless_block_validator.go | 26 +++++++------ system_tests/validation_mock_test.go | 4 +- validator/client/redis/producer.go | 12 +++--- validator/client/validation_client.go | 29 +++++++++------ validator/interface.go | 2 +- validator/server_api/json.go | 45 +++++++++++++---------- validator/server_arb/validator_spawner.go | 14 ++++--- validator/server_jit/jit_machine.go | 14 +++++-- validator/server_jit/spawner.go | 4 +- validator/validation_entry.go | 3 +- validator/valnode/validation_api.go | 4 +- 13 files changed, 93 insertions(+), 70 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index 04851b700f..8063f74d58 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -468,7 +468,7 @@ func (v *BlockValidator) sendRecord(s *validationStatus) error { //nolint:gosec func (v *BlockValidator) writeToFile(validationEntry *validationEntry, moduleRoot common.Hash) error { - input, err := validationEntry.ToInput("wavm") + input, err := validationEntry.ToInput([]string{"wavm"}) if err != nil { return err } @@ -814,7 +814,7 @@ validationsLoop: validatorPendingValidationsGauge.Inc(1) var runs []validator.ValidationRun for _, moduleRoot := range wasmRoots { - input, err := validationStatus.Entry.ToInput(v.chosenValidator[moduleRoot].StylusArch()) + input, err := validationStatus.Entry.ToInput(v.chosenValidator[moduleRoot].StylusArchs()) if err != nil && ctx.Err() == nil { v.possiblyFatal(fmt.Errorf("%w: error preparing validation", err)) continue diff --git a/staker/challenge_manager.go b/staker/challenge_manager.go index f50b85064c..80cafccced 100644 --- a/staker/challenge_manager.go +++ b/staker/challenge_manager.go @@ -467,7 +467,7 @@ func (m *ChallengeManager) createExecutionBackend(ctx context.Context, step uint if err != nil { return fmt.Errorf("error creating validation entry for challenge %v msg %v for execution challenge: %w", m.challengeIndex, initialCount, err) } - input, err := entry.ToInput("wavm") + input, err := entry.ToInput([]string{"wavm"}) if err != nil { return fmt.Errorf("error getting validation entry input of challenge %v msg %v: %w", m.challengeIndex, initialCount, err) } diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index 7daa1ab872..65f5787e03 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -135,7 +135,7 @@ type validationEntry struct { DelayedMsg []byte } -func (e *validationEntry) ToInput(stylusArch string) (*validator.ValidationInput, error) { +func (e *validationEntry) ToInput(stylusArchs []string) (*validator.ValidationInput, error) { if e.Stage != Ready { return nil, errors.New("cannot create input from non-ready entry") } @@ -144,20 +144,24 @@ func (e *validationEntry) ToInput(stylusArch string) (*validator.ValidationInput HasDelayedMsg: e.HasDelayedMsg, DelayedMsgNr: e.DelayedMsgNr, Preimages: e.Preimages, - StylusArch: stylusArch, - UserWasms: make(map[common.Hash][]byte, len(e.UserWasms)), + UserWasms: make(map[string]map[common.Hash][]byte, len(e.UserWasms)), BatchInfo: e.BatchInfo, DelayedMsg: e.DelayedMsg, StartState: e.Start, DebugChain: e.ChainConfig.DebugMode(), } + for _, stylusArch := range stylusArchs { + res.UserWasms[stylusArch] = make(map[common.Hash][]byte) + } for hash, info := range e.UserWasms { - if stylusArch == "wavm" { - res.UserWasms[hash] = info.Module - } else if stylusArch == runtime.GOARCH { - res.UserWasms[hash] = info.Asm - } else { - return nil, fmt.Errorf("stylusArch not supported by block validator: %v", stylusArch) + for _, stylusArch := range stylusArchs { + if stylusArch == "wavm" { + res.UserWasms[stylusArch][hash] = info.Module + } else if stylusArch == runtime.GOARCH { + res.UserWasms[stylusArch][hash] = info.Asm + } else { + return nil, fmt.Errorf("stylusArch not supported by block validator: %v", stylusArch) + } } } return &res, nil @@ -389,7 +393,7 @@ func (v *StatelessBlockValidator) ValidateResult( if !useExec { if v.redisValidator != nil { if validator.SpawnerSupportsModule(v.redisValidator, moduleRoot) { - input, err := entry.ToInput(v.redisValidator.StylusArch()) + input, err := entry.ToInput(v.redisValidator.StylusArchs()) if err != nil { return false, nil, err } @@ -400,7 +404,7 @@ func (v *StatelessBlockValidator) ValidateResult( if run == nil { for _, spawner := range v.execSpawners { if validator.SpawnerSupportsModule(spawner, moduleRoot) { - input, err := entry.ToInput(spawner.StylusArch()) + input, err := entry.ToInput(spawner.StylusArchs()) if err != nil { return false, nil, err } diff --git a/system_tests/validation_mock_test.go b/system_tests/validation_mock_test.go index a1d9f314cc..2c6321d009 100644 --- a/system_tests/validation_mock_test.go +++ b/system_tests/validation_mock_test.go @@ -60,8 +60,8 @@ func (s *mockSpawner) WasmModuleRoots() ([]common.Hash, error) { return mockWasmModuleRoots, nil } -func (s *mockSpawner) StylusArch() string { - return "mock" +func (s *mockSpawner) StylusArchs() []string { + return []string{"mock"} } func (s *mockSpawner) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index 4a81ae1fdf..b3ad0f8839 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -23,7 +23,7 @@ type ValidationClientConfig struct { StreamPrefix string `koanf:"stream-prefix"` Room int32 `koanf:"room"` RedisURL string `koanf:"redis-url"` - StylusArch string `koanf:"stylus-arch"` + StylusArchs []string `koanf:"stylus-archs"` ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"` CreateStreams bool `koanf:"create-streams"` } @@ -36,7 +36,7 @@ var DefaultValidationClientConfig = ValidationClientConfig{ Name: "redis validation client", Room: 2, RedisURL: "", - StylusArch: "wavm", + StylusArchs: []string{"wavm"}, ProducerConfig: pubsub.DefaultProducerConfig, CreateStreams: true, } @@ -46,7 +46,7 @@ var TestValidationClientConfig = ValidationClientConfig{ Room: 2, RedisURL: "", StreamPrefix: "test-", - StylusArch: "wavm", + StylusArchs: []string{"wavm"}, ProducerConfig: pubsub.TestProducerConfig, CreateStreams: false, } @@ -56,7 +56,7 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room") f.String(prefix+".redis-url", DefaultValidationClientConfig.RedisURL, "redis url") f.String(prefix+".stream-prefix", DefaultValidationClientConfig.StreamPrefix, "prefix for stream name") - f.String(prefix+".stylus-arch", DefaultValidationClientConfig.StylusArch, "arch for stylus workers") + f.StringSlice(prefix+".stylus-archs", DefaultValidationClientConfig.StylusArchs, "archs required for stylus workers") pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f) f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist") } @@ -152,8 +152,8 @@ func (c *ValidationClient) Name() string { return c.config.Name } -func (c *ValidationClient) StylusArch() string { - return c.config.StylusArch +func (c *ValidationClient) StylusArchs() []string { + return c.config.StylusArchs } func (c *ValidationClient) Room() int { diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index 610f3eb01f..43231b913e 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -30,16 +30,16 @@ type ValidationClient struct { stopwaiter.StopWaiter client *rpcclient.RpcClient name string - stylusArch string + stylusArchs []string room atomic.Int32 wasmModuleRoots []common.Hash } func NewValidationClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ValidationClient { return &ValidationClient{ - client: rpcclient.NewRpcClient(config, stack), - name: "not started", - stylusArch: "not started", + client: rpcclient.NewRpcClient(config, stack), + name: "not started", + stylusArchs: []string{"not started"}, } } @@ -68,12 +68,17 @@ func (c *ValidationClient) Start(ctx_in context.Context) error { if len(name) == 0 { return errors.New("couldn't read name from server") } - var stylusArch string - if err := c.client.CallContext(ctx, &stylusArch, server_api.Namespace+"_stylusArch"); err != nil { + var stylusArchs []string + if err := c.client.CallContext(ctx, &stylusArchs, server_api.Namespace+"_stylusArchs"); err != nil { return err } - if stylusArch != "wavm" && stylusArch != runtime.GOARCH && stylusArch != "mock" { - return fmt.Errorf("unsupported stylus architecture: %v", stylusArch) + if len(stylusArchs) == 0 { + return fmt.Errorf("could not read stylus archs from validation server") + } + for _, stylusArch := range stylusArchs { + if stylusArch != "wavm" && stylusArch != runtime.GOARCH && stylusArch != "mock" { + return fmt.Errorf("unsupported stylus architecture: %v", stylusArch) + } } var moduleRoots []common.Hash if err := c.client.CallContext(ctx, &moduleRoots, server_api.Namespace+"_wasmModuleRoots"); err != nil { @@ -95,7 +100,7 @@ func (c *ValidationClient) Start(ctx_in context.Context) error { c.room.Store(int32(room)) c.wasmModuleRoots = moduleRoots c.name = name - c.stylusArch = stylusArch + c.stylusArchs = stylusArchs return nil } @@ -106,11 +111,11 @@ func (c *ValidationClient) WasmModuleRoots() ([]common.Hash, error) { return nil, errors.New("not started") } -func (c *ValidationClient) StylusArch() string { +func (c *ValidationClient) StylusArchs() []string { if c.Started() { - return c.stylusArch + return c.stylusArchs } - return "not started" + return []string{"not started"} } func (c *ValidationClient) Stop() { diff --git a/validator/interface.go b/validator/interface.go index d59e94c4a0..80aa2c1fcc 100644 --- a/validator/interface.go +++ b/validator/interface.go @@ -13,7 +13,7 @@ type ValidationSpawner interface { Start(context.Context) error Stop() Name() string - StylusArch() string + StylusArchs() []string Room() int } diff --git a/validator/server_api/json.go b/validator/server_api/json.go index 5de6999ef1..942013d280 100644 --- a/validator/server_api/json.go +++ b/validator/server_api/json.go @@ -62,8 +62,7 @@ type InputJSON struct { BatchInfo []BatchInfoJson DelayedMsgB64 string StartState validator.GoGlobalState - StylusArch string - UserWasms map[common.Hash]string + UserWasms map[string]map[common.Hash]string DebugChain bool } @@ -95,20 +94,23 @@ func ValidationInputToJson(entry *validator.ValidationInput) *InputJSON { DelayedMsgB64: base64.StdEncoding.EncodeToString(entry.DelayedMsg), StartState: entry.StartState, PreimagesB64: jsonPreimagesMap, - UserWasms: make(map[common.Hash]string), - StylusArch: entry.StylusArch, + UserWasms: make(map[string]map[common.Hash]string), DebugChain: entry.DebugChain, } for _, binfo := range entry.BatchInfo { encData := base64.StdEncoding.EncodeToString(binfo.Data) res.BatchInfo = append(res.BatchInfo, BatchInfoJson{Number: binfo.Number, DataB64: encData}) } - for moduleHash, data := range entry.UserWasms { - compressed, err := arbcompress.CompressWell(data) - if err != nil { - entry.StylusArch = "compressError:" + err.Error() + for arch, wasms := range entry.UserWasms { + archWasms := make(map[common.Hash]string) + for moduleHash, data := range wasms { + compressed, err := arbcompress.CompressWell(data) + if err != nil { + continue + } + archWasms[moduleHash] = base64.StdEncoding.EncodeToString(compressed) } - res.UserWasms[moduleHash] = base64.StdEncoding.EncodeToString(compressed) + res.UserWasms[arch] = archWasms } return res } @@ -124,8 +126,7 @@ func ValidationInputFromJson(entry *InputJSON) (*validator.ValidationInput, erro DelayedMsgNr: entry.DelayedMsgNr, StartState: entry.StartState, Preimages: preimages, - StylusArch: entry.StylusArch, - UserWasms: make(map[common.Hash][]byte), + UserWasms: make(map[string]map[common.Hash][]byte), DebugChain: entry.DebugChain, } delayed, err := base64.StdEncoding.DecodeString(entry.DelayedMsgB64) @@ -144,16 +145,20 @@ func ValidationInputFromJson(entry *InputJSON) (*validator.ValidationInput, erro } valInput.BatchInfo = append(valInput.BatchInfo, decInfo) } - for moduleHash, encoded := range entry.UserWasms { - decoded, err := base64.StdEncoding.DecodeString(encoded) - if err != nil { - return nil, err - } - uncompressed, err := arbcompress.Decompress(decoded, 30000000) - if err != nil { - return nil, err + for arch, wasms := range entry.UserWasms { + archWasms := make(map[common.Hash][]byte) + for moduleHash, encoded := range wasms { + decoded, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + return nil, err + } + uncompressed, err := arbcompress.Decompress(decoded, 30000000) + if err != nil { + return nil, err + } + archWasms[moduleHash] = uncompressed } - valInput.UserWasms[moduleHash] = uncompressed + valInput.UserWasms[arch] = archWasms } return valInput, nil } diff --git a/validator/server_arb/validator_spawner.go b/validator/server_arb/validator_spawner.go index 73549af3ec..1d4126dc7c 100644 --- a/validator/server_arb/validator_spawner.go +++ b/validator/server_arb/validator_spawner.go @@ -88,8 +88,8 @@ func (s *ArbitratorSpawner) WasmModuleRoots() ([]common.Hash, error) { return s.locator.ModuleRoots(), nil } -func (s *ArbitratorSpawner) StylusArch() string { - return "wavm" +func (s *ArbitratorSpawner) StylusArchs() []string { + return []string{"wavm"} } func (s *ArbitratorSpawner) Name() string { @@ -122,10 +122,14 @@ func (v *ArbitratorSpawner) loadEntryToMachine(ctx context.Context, entry *valid return fmt.Errorf("error while trying to add sequencer msg for proving: %w", err) } } - if entry.StylusArch != "wavm" { - return fmt.Errorf("bad stylus arch loaded to machine. Expected wavm. Got: %s", entry.StylusArch) + if len(entry.UserWasms["wavm"]) == 0 { + for stylusArch, wasms := range entry.UserWasms { + if len(wasms) > 0 { + return fmt.Errorf("bad stylus arch loaded to machine. Expected wavm. Got: %s", stylusArch) + } + } } - for moduleHash, module := range entry.UserWasms { + for moduleHash, module := range entry.UserWasms["wavm"] { err = mach.AddUserWasm(moduleHash, module) if err != nil { log.Error( diff --git a/validator/server_jit/jit_machine.go b/validator/server_jit/jit_machine.go index 3a27c64e21..e4fb840cbb 100644 --- a/validator/server_jit/jit_machine.go +++ b/validator/server_jit/jit_machine.go @@ -212,11 +212,17 @@ func (machine *JitMachine) prove( } } - // send user wasms - if entry.StylusArch != runtime.GOARCH { - return state, fmt.Errorf("bad stylus arch for validation input. got: %v, expected: %v", entry.StylusArch, runtime.GOARCH) + userWasms := entry.UserWasms[runtime.GOARCH] + + // if there are user wasms, but only for wrong architecture - error + if len(userWasms) == 0 { + for arch, userWasms := range entry.UserWasms { + if len(userWasms) != 0 { + return state, fmt.Errorf("bad stylus arch for validation input. got: %v, expected: %v", arch, runtime.GOARCH) + } + } } - userWasms := entry.UserWasms + if err := writeUint32(uint32(len(userWasms))); err != nil { return state, err } diff --git a/validator/server_jit/spawner.go b/validator/server_jit/spawner.go index dd9f3bbfc1..5ba3664109 100644 --- a/validator/server_jit/spawner.go +++ b/validator/server_jit/spawner.go @@ -71,8 +71,8 @@ func (v *JitSpawner) WasmModuleRoots() ([]common.Hash, error) { return v.locator.ModuleRoots(), nil } -func (v *JitSpawner) StylusArch() string { - return runtime.GOARCH +func (v *JitSpawner) StylusArchs() []string { + return []string{runtime.GOARCH} } func (v *JitSpawner) execute( diff --git a/validator/validation_entry.go b/validator/validation_entry.go index 3c0f28eb37..133a67a8a8 100644 --- a/validator/validation_entry.go +++ b/validator/validation_entry.go @@ -16,8 +16,7 @@ type ValidationInput struct { HasDelayedMsg bool DelayedMsgNr uint64 Preimages map[arbutil.PreimageType]map[common.Hash][]byte - StylusArch string - UserWasms map[common.Hash][]byte + UserWasms map[string]map[common.Hash][]byte BatchInfo []BatchInfo DelayedMsg []byte StartState GoGlobalState diff --git a/validator/valnode/validation_api.go b/validator/valnode/validation_api.go index b8787f4f02..6245ffc5e3 100644 --- a/validator/valnode/validation_api.go +++ b/validator/valnode/validation_api.go @@ -44,8 +44,8 @@ func (a *ValidationServerAPI) WasmModuleRoots() ([]common.Hash, error) { return a.spawner.WasmModuleRoots() } -func (a *ValidationServerAPI) StylusArch() (string, error) { - return a.spawner.StylusArch(), nil +func (a *ValidationServerAPI) StylusArchs() ([]string, error) { + return a.spawner.StylusArchs(), nil } func NewValidationServerAPI(spawner validator.ValidationSpawner) *ValidationServerAPI { From 4ac4001c0c99ea8d6dcfd42b74c541f1c02fe7c6 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Fri, 19 Jul 2024 18:37:40 -0600 Subject: [PATCH 3/9] validation input: use compression level 1 --- validator/server_api/json.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/server_api/json.go b/validator/server_api/json.go index 942013d280..173429d9b1 100644 --- a/validator/server_api/json.go +++ b/validator/server_api/json.go @@ -104,7 +104,7 @@ func ValidationInputToJson(entry *validator.ValidationInput) *InputJSON { for arch, wasms := range entry.UserWasms { archWasms := make(map[common.Hash]string) for moduleHash, data := range wasms { - compressed, err := arbcompress.CompressWell(data) + compressed, err := arbcompress.CompressLevel(data, 1) if err != nil { continue } From 8cd0d648428243d04ce412c71bf61e29559b2d34 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Mon, 22 Jul 2024 17:33:03 -0600 Subject: [PATCH 4/9] block_validator: add histograms --- staker/block_validator.go | 45 ++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index 8063f74d58..a354bafc50 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -31,13 +31,18 @@ import ( ) var ( - validatorPendingValidationsGauge = metrics.NewRegisteredGauge("arb/validator/validations/pending", nil) - validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil) - validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil) - validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil) - validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil) - validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil) - validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil) + validatorPendingValidationsGauge = metrics.NewRegisteredGauge("arb/validator/validations/pending", nil) + validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil) + validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil) + validatorValidationWaitToRecordHist = metrics.NewRegisteredHistogram("arb/validator/validations/waitToRecord", nil, metrics.NewBoundedHistogramSample()) + validatorValidationRecordingHist = metrics.NewRegisteredHistogram("arb/validator/validations/recording", nil, metrics.NewBoundedHistogramSample()) + validatorValidationWaitToLaunchHist = metrics.NewRegisteredHistogram("arb/validator/validations/waitToRun", nil, metrics.NewBoundedHistogramSample()) + validatorValidationLaunchHist = metrics.NewRegisteredHistogram("arb/validator/validations/waitToRun", nil, metrics.NewBoundedHistogramSample()) + validatorValidationRunningHist = metrics.NewRegisteredHistogram("arb/validator/validations/running", nil, metrics.NewBoundedHistogramSample()) + validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil) + validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil) + validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil) + validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil) ) type BlockValidator struct { @@ -210,10 +215,11 @@ const ( ) type validationStatus struct { - Status atomic.Uint32 // atomic: value is one of validationStatus* - Cancel func() // non-atomic: only read/written to with reorg mutex - Entry *validationEntry // non-atomic: only read if Status >= validationStatusPrepared - Runs []validator.ValidationRun // if status >= ValidationSent + Status atomic.Uint32 // atomic: value is one of validationStatus* + Cancel func() // non-atomic: only read/written to with reorg mutex + Entry *validationEntry // non-atomic: only read if Status >= validationStatusPrepared + Runs []validator.ValidationRun // if status >= ValidationSent + tsMilli int64 } func (s *validationStatus) getStatus() valStatusField { @@ -225,6 +231,12 @@ func (s *validationStatus) replaceStatus(old, new valStatusField) bool { return s.Status.CompareAndSwap(uint32(old), uint32(new)) } +func (s *validationStatus) timeStampInterval() int64 { + start := s.tsMilli + s.tsMilli = time.Now().UnixMilli() + return s.tsMilli - start +} + func NewBlockValidator( statelessBlockValidator *StatelessBlockValidator, inbox InboxTrackerInterface, @@ -447,6 +459,8 @@ func (v *BlockValidator) sendRecord(s *validationStatus) error { if !s.replaceStatus(Created, RecordSent) { return fmt.Errorf("failed status check for send record. Status: %v", s.getStatus()) } + + validatorValidationWaitToRecordHist.Update(s.timeStampInterval()) v.LaunchThread(func(ctx context.Context) { err := v.ValidationEntryRecord(ctx, s.Entry) if ctx.Err() != nil { @@ -457,11 +471,11 @@ func (v *BlockValidator) sendRecord(s *validationStatus) error { log.Error("Error while recording", "err", err, "status", s.getStatus()) return } + validatorValidationRecordingHist.Update(s.timeStampInterval()) if !s.replaceStatus(RecordSent, Prepared) { log.Error("Fault trying to update validation with recording", "entry", s.Entry, "status", s.getStatus()) return } - nonBlockingTrigger(v.progressValidationsChan) }) return nil } @@ -585,7 +599,8 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e return false, err } status := &validationStatus{ - Entry: entry, + Entry: entry, + tsMilli: time.Now().UnixMilli(), } status.Status.Store(uint32(Created)) v.validations.Store(pos, status) @@ -811,6 +826,7 @@ validationsLoop: if !replaced { v.possiblyFatal(errors.New("failed to set SendingValidation status")) } + validatorValidationWaitToLaunchHist.Update(validationStatus.timeStampInterval()) validatorPendingValidationsGauge.Inc(1) var runs []validator.ValidationRun for _, moduleRoot := range wasmRoots { @@ -823,12 +839,14 @@ validationsLoop: log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot) runs = append(runs, run) } + validatorValidationLaunchHist.Update(validationStatus.timeStampInterval()) validationCtx, cancel := context.WithCancel(ctx) validationStatus.Runs = runs validationStatus.Cancel = cancel v.LaunchUntrackedThread(func() { defer validatorPendingValidationsGauge.Dec(1) defer cancel() + startTsMilli := validationStatus.tsMilli replaced = validationStatus.replaceStatus(SendingValidation, ValidationSent) if !replaced { v.possiblyFatal(errors.New("failed to set status to ValidationSent")) @@ -842,6 +860,7 @@ validationsLoop: return } } + validatorValidationRunningHist.Update(time.Now().UnixMilli() - startTsMilli) nonBlockingTrigger(v.progressValidationsChan) }) } From 9ea15b9025234b5a1c8a8dc285f94a57eb49cfc4 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Tue, 23 Jul 2024 14:44:43 -0600 Subject: [PATCH 5/9] block_validator: fix metrics --- staker/block_validator.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index a354bafc50..13ece78405 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -36,8 +36,8 @@ var ( validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil) validatorValidationWaitToRecordHist = metrics.NewRegisteredHistogram("arb/validator/validations/waitToRecord", nil, metrics.NewBoundedHistogramSample()) validatorValidationRecordingHist = metrics.NewRegisteredHistogram("arb/validator/validations/recording", nil, metrics.NewBoundedHistogramSample()) - validatorValidationWaitToLaunchHist = metrics.NewRegisteredHistogram("arb/validator/validations/waitToRun", nil, metrics.NewBoundedHistogramSample()) - validatorValidationLaunchHist = metrics.NewRegisteredHistogram("arb/validator/validations/waitToRun", nil, metrics.NewBoundedHistogramSample()) + validatorValidationWaitToLaunchHist = metrics.NewRegisteredHistogram("arb/validator/validations/waitToLaunch", nil, metrics.NewBoundedHistogramSample()) + validatorValidationLaunchingHist = metrics.NewRegisteredHistogram("arb/validator/validations/launching", nil, metrics.NewBoundedHistogramSample()) validatorValidationRunningHist = metrics.NewRegisteredHistogram("arb/validator/validations/running", nil, metrics.NewBoundedHistogramSample()) validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil) validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil) @@ -839,7 +839,7 @@ validationsLoop: log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot) runs = append(runs, run) } - validatorValidationLaunchHist.Update(validationStatus.timeStampInterval()) + validatorValidationLaunchingHist.Update(validationStatus.timeStampInterval()) validationCtx, cancel := context.WithCancel(ctx) validationStatus.Runs = runs validationStatus.Cancel = cancel From f96aafd80720e0cfbaee455244b2101e96031976 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Tue, 23 Jul 2024 19:01:35 -0600 Subject: [PATCH 6/9] json to input: retry if decompress too large --- arbcompress/native.go | 10 +++++++++- validator/server_api/json.go | 19 ++++++++++++++++--- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/arbcompress/native.go b/arbcompress/native.go index 4624d6222e..8244010979 100644 --- a/arbcompress/native.go +++ b/arbcompress/native.go @@ -12,7 +12,10 @@ package arbcompress #include "arbitrator.h" */ import "C" -import "fmt" +import ( + "errors" + "fmt" +) type u8 = C.uint8_t type u32 = C.uint32_t @@ -44,6 +47,8 @@ func Compress(input []byte, level uint32, dictionary Dictionary) ([]byte, error) return output, nil } +var ErrOutputWontFit = errors.New("output won't fit in maxsize") + func Decompress(input []byte, maxSize int) ([]byte, error) { return DecompressWithDictionary(input, maxSize, EmptyDictionary) } @@ -54,6 +59,9 @@ func DecompressWithDictionary(input []byte, maxSize int, dictionary Dictionary) inbuf := sliceToBuffer(input) status := C.brotli_decompress(inbuf, outbuf, C.Dictionary(dictionary)) + if status == C.BrotliStatus_NeedsMoreOutput { + return nil, ErrOutputWontFit + } if status != C.BrotliStatus_Success { return nil, fmt.Errorf("failed decompression: %d", status) } diff --git a/validator/server_api/json.go b/validator/server_api/json.go index 173429d9b1..d93bc2f4e7 100644 --- a/validator/server_api/json.go +++ b/validator/server_api/json.go @@ -6,6 +6,7 @@ package server_api import ( "encoding/base64" "encoding/json" + "errors" "fmt" "os" @@ -152,9 +153,21 @@ func ValidationInputFromJson(entry *InputJSON) (*validator.ValidationInput, erro if err != nil { return nil, err } - uncompressed, err := arbcompress.Decompress(decoded, 30000000) - if err != nil { - return nil, err + maxSize := 2_000_000 + var uncompressed []byte + for { + uncompressed, err = arbcompress.Decompress(decoded, maxSize) + if errors.Is(err, arbcompress.ErrOutputWontFit) { + if maxSize >= 128_000_000 { + return nil, errors.New("failed decompression: too large") + } + maxSize = maxSize * 4 + continue + } + if err != nil { + return nil, err + } + break } archWasms[moduleHash] = uncompressed } From 789b96942c9105708e49d63dc4bdbf14f555abaa Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Tue, 23 Jul 2024 21:23:04 -0600 Subject: [PATCH 7/9] validator/toInput: increase limit --- validator/server_api/json.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/server_api/json.go b/validator/server_api/json.go index d93bc2f4e7..90746e4c57 100644 --- a/validator/server_api/json.go +++ b/validator/server_api/json.go @@ -158,7 +158,7 @@ func ValidationInputFromJson(entry *InputJSON) (*validator.ValidationInput, erro for { uncompressed, err = arbcompress.Decompress(decoded, maxSize) if errors.Is(err, arbcompress.ErrOutputWontFit) { - if maxSize >= 128_000_000 { + if maxSize >= 512_000_000 { return nil, errors.New("failed decompression: too large") } maxSize = maxSize * 4 From a8284d5238a9732e4fe8731f23d0782cf8c528d5 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Fri, 26 Jul 2024 11:54:25 -0600 Subject: [PATCH 8/9] block_validator: update profiling --- staker/block_validator.go | 73 +++++++++++++++++++++------------------ 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index 13ece78405..03367a50b0 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -31,18 +31,18 @@ import ( ) var ( - validatorPendingValidationsGauge = metrics.NewRegisteredGauge("arb/validator/validations/pending", nil) - validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil) - validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil) - validatorValidationWaitToRecordHist = metrics.NewRegisteredHistogram("arb/validator/validations/waitToRecord", nil, metrics.NewBoundedHistogramSample()) - validatorValidationRecordingHist = metrics.NewRegisteredHistogram("arb/validator/validations/recording", nil, metrics.NewBoundedHistogramSample()) - validatorValidationWaitToLaunchHist = metrics.NewRegisteredHistogram("arb/validator/validations/waitToLaunch", nil, metrics.NewBoundedHistogramSample()) - validatorValidationLaunchingHist = metrics.NewRegisteredHistogram("arb/validator/validations/launching", nil, metrics.NewBoundedHistogramSample()) - validatorValidationRunningHist = metrics.NewRegisteredHistogram("arb/validator/validations/running", nil, metrics.NewBoundedHistogramSample()) - validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil) - validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil) - validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil) - validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil) + validatorPendingValidationsGauge = metrics.NewRegisteredGauge("arb/validator/validations/pending", nil) + validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil) + validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil) + validatorProfileWaitToRecordHist = metrics.NewRegisteredHistogram("arb/validator/profile/wait_to_record", nil, metrics.NewBoundedHistogramSample()) + validatorProfileRecordingHist = metrics.NewRegisteredHistogram("arb/validator/profile/recording", nil, metrics.NewBoundedHistogramSample()) + validatorProfileWaitToLaunchHist = metrics.NewRegisteredHistogram("arb/validator/profile/wait_to_launch", nil, metrics.NewBoundedHistogramSample()) + validatorProfileLaunchingHist = metrics.NewRegisteredHistogram("arb/validator/profile/launching", nil, metrics.NewBoundedHistogramSample()) + validatorProfileRunningHist = metrics.NewRegisteredHistogram("arb/validator/profile/running", nil, metrics.NewBoundedHistogramSample()) + validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil) + validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil) + validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil) + validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil) ) type BlockValidator struct { @@ -215,11 +215,11 @@ const ( ) type validationStatus struct { - Status atomic.Uint32 // atomic: value is one of validationStatus* - Cancel func() // non-atomic: only read/written to with reorg mutex - Entry *validationEntry // non-atomic: only read if Status >= validationStatusPrepared - Runs []validator.ValidationRun // if status >= ValidationSent - tsMilli int64 + Status atomic.Uint32 // atomic: value is one of validationStatus* + Cancel func() // non-atomic: only read/written to with reorg mutex + Entry *validationEntry // non-atomic: only read if Status >= validationStatusPrepared + Runs []validator.ValidationRun // if status >= ValidationSent + profileTS int64 // time-stamp for profiling } func (s *validationStatus) getStatus() valStatusField { @@ -231,10 +231,11 @@ func (s *validationStatus) replaceStatus(old, new valStatusField) bool { return s.Status.CompareAndSwap(uint32(old), uint32(new)) } -func (s *validationStatus) timeStampInterval() int64 { - start := s.tsMilli - s.tsMilli = time.Now().UnixMilli() - return s.tsMilli - start +// gets how many miliseconds last step took, and starts measuring a new step +func (s *validationStatus) profileStep() int64 { + start := s.profileTS + s.profileTS = time.Now().UnixMilli() + return s.profileTS - start } func NewBlockValidator( @@ -460,7 +461,7 @@ func (v *BlockValidator) sendRecord(s *validationStatus) error { return fmt.Errorf("failed status check for send record. Status: %v", s.getStatus()) } - validatorValidationWaitToRecordHist.Update(s.timeStampInterval()) + validatorProfileWaitToRecordHist.Update(s.profileStep()) v.LaunchThread(func(ctx context.Context) { err := v.ValidationEntryRecord(ctx, s.Entry) if ctx.Err() != nil { @@ -471,11 +472,12 @@ func (v *BlockValidator) sendRecord(s *validationStatus) error { log.Error("Error while recording", "err", err, "status", s.getStatus()) return } - validatorValidationRecordingHist.Update(s.timeStampInterval()) + validatorProfileRecordingHist.Update(s.profileStep()) if !s.replaceStatus(RecordSent, Prepared) { log.Error("Fault trying to update validation with recording", "entry", s.Entry, "status", s.getStatus()) return } + nonBlockingTrigger(v.progressValidationsChan) }) return nil } @@ -599,8 +601,8 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e return false, err } status := &validationStatus{ - Entry: entry, - tsMilli: time.Now().UnixMilli(), + Entry: entry, + profileTS: time.Now().UnixMilli(), } status.Status.Store(uint32(Created)) v.validations.Store(pos, status) @@ -807,12 +809,13 @@ validationsLoop: continue } for _, moduleRoot := range wasmRoots { - if v.chosenValidator[moduleRoot] == nil { + spawner := v.chosenValidator[moduleRoot] + if spawner == nil { notFoundErr := fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot) v.possiblyFatal(notFoundErr) return nil, notFoundErr } - if v.chosenValidator[moduleRoot].Room() == 0 { + if spawner.Room() == 0 { log.Trace("advanceValidations: no more room", "moduleRoot", moduleRoot) return nil, nil } @@ -826,27 +829,31 @@ validationsLoop: if !replaced { v.possiblyFatal(errors.New("failed to set SendingValidation status")) } - validatorValidationWaitToLaunchHist.Update(validationStatus.timeStampInterval()) + validatorProfileWaitToLaunchHist.Update(validationStatus.profileStep()) validatorPendingValidationsGauge.Inc(1) var runs []validator.ValidationRun for _, moduleRoot := range wasmRoots { - input, err := validationStatus.Entry.ToInput(v.chosenValidator[moduleRoot].StylusArchs()) + spawner := v.chosenValidator[moduleRoot] + input, err := validationStatus.Entry.ToInput(spawner.StylusArchs()) if err != nil && ctx.Err() == nil { v.possiblyFatal(fmt.Errorf("%w: error preparing validation", err)) continue } - run := v.chosenValidator[moduleRoot].Launch(input, moduleRoot) + if ctx.Err() != nil { + return nil, ctx.Err() + } + run := spawner.Launch(input, moduleRoot) log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot) runs = append(runs, run) } - validatorValidationLaunchingHist.Update(validationStatus.timeStampInterval()) + validatorProfileLaunchingHist.Update(validationStatus.profileStep()) validationCtx, cancel := context.WithCancel(ctx) validationStatus.Runs = runs validationStatus.Cancel = cancel v.LaunchUntrackedThread(func() { defer validatorPendingValidationsGauge.Dec(1) defer cancel() - startTsMilli := validationStatus.tsMilli + startTsMilli := validationStatus.profileTS replaced = validationStatus.replaceStatus(SendingValidation, ValidationSent) if !replaced { v.possiblyFatal(errors.New("failed to set status to ValidationSent")) @@ -860,7 +867,7 @@ validationsLoop: return } } - validatorValidationRunningHist.Update(time.Now().UnixMilli() - startTsMilli) + validatorProfileRunningHist.Update(time.Now().UnixMilli() - startTsMilli) nonBlockingTrigger(v.progressValidationsChan) }) } From b7b2dd6b92522f7d0f3170b8f94d361f1b6494a5 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Fri, 26 Jul 2024 13:10:41 -0600 Subject: [PATCH 9/9] validation_client: start after reading data --- validator/client/validation_client.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index 43231b913e..d6743b109e 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -55,9 +55,7 @@ func (c *ValidationClient) Launch(entry *validator.ValidationInput, moduleRoot c return server_common.NewValRun(promise, moduleRoot) } -func (c *ValidationClient) Start(ctx_in context.Context) error { - c.StopWaiter.Start(ctx_in, c) - ctx := c.GetContext() +func (c *ValidationClient) Start(ctx context.Context) error { if err := c.client.Start(ctx); err != nil { return err } @@ -101,6 +99,7 @@ func (c *ValidationClient) Start(ctx_in context.Context) error { c.wasmModuleRoots = moduleRoots c.name = name c.stylusArchs = stylusArchs + c.StopWaiter.Start(ctx, c) return nil }