diff --git a/Dockerfile b/Dockerfile index 947d6b5a47..7cba82d4fc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -271,5 +271,15 @@ RUN export DEBIAN_FRONTEND=noninteractive && \ USER user +FROM nitro-node-dev as nitro-node-split +USER root + +RUN export DEBIAN_FRONTEND=noninteractive && \ + apt-get update && \ + apt-get install -y xxd netcat-traditional +COPY scripts/split-val-entry.sh /usr/local/bin +ENTRYPOINT [ "/usr/local/bin/split-val-entry.sh" ] +USER user + FROM nitro-node as nitro-node-default # Just to ensure nitro-node-dist is default diff --git a/arbnode/api.go b/arbnode/api.go index 51437864d1..228ad51cf8 100644 --- a/arbnode/api.go +++ b/arbnode/api.go @@ -2,7 +2,6 @@ package arbnode import ( "context" - "errors" "fmt" "time" @@ -40,11 +39,11 @@ func (a *BlockValidatorDebugAPI) ValidateMessageNumber( if moduleRootOptional != nil { moduleRoot = *moduleRootOptional } else { - moduleRoots := a.val.GetModuleRootsToValidate() - if len(moduleRoots) == 0 { - return result, errors.New("no current WasmModuleRoot configured, must provide parameter") + var err error + moduleRoot, err = a.val.GetLatestWasmModuleRoot(ctx) + if err != nil { + return result, fmt.Errorf("no latest WasmModuleRoot configured, must provide parameter: %w", err) } - moduleRoot = moduleRoots[0] } start_time := time.Now() valid, gs, err := a.val.ValidateResult(ctx, arbutil.MessageIndex(msgNum), full, moduleRoot) diff --git a/nitro-testnode b/nitro-testnode index 3922df9caf..e530842e58 160000 --- a/nitro-testnode +++ b/nitro-testnode @@ -1 +1 @@ -Subproject commit 3922df9caf7a65dd4168b8158c1244c5fe88780e +Subproject commit e530842e583e2f3543f97a71c3a7cb53f8a10814 diff --git a/scripts/split-val-entry.sh b/scripts/split-val-entry.sh new file mode 100755 index 0000000000..a5ee0709b6 --- /dev/null +++ b/scripts/split-val-entry.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +xxd -l 32 -ps -c 40 /dev/urandom > /tmp/nitro-val.jwt + +echo launching validation servers +# To add validation server: +# > launch them here with a different port and --validation.wasm.root-path +# add their port to wait loop +# edit validation-server-configs-list to include the other nodes +/usr/local/bin/nitro-val --file-logging.enable=false --auth.addr 127.0.0.10 --auth.origins 127.0.0.1 --auth.jwtsecret /tmp/nitro-val.jwt --auth.port 52000 & +for port in 52000; do + while ! nc -w1 -z 127.0.0.10 $port; do + echo waiting for validation port $port + sleep 1 + done +done +echo launching nitro-node +/usr/local/bin/nitro --node.block-validator.pending-upgrade-module-root="0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4" --node.block-validator.validation-server-configs-list='[{"jwtsecret":"/tmp/nitro-val.jwt","url":"http://127.0.0.10:52000"}]' "$@" diff --git a/staker/block_validator.go b/staker/block_validator.go index 0cde4423c0..a7bf907521 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "fmt" + "regexp" "runtime" "sync" "sync/atomic" @@ -74,6 +75,13 @@ type BlockValidator struct { sendRecordChan chan struct{} progressValidationsChan chan struct{} + chosenValidator map[common.Hash]validator.ValidationSpawner + + // wasmModuleRoot + moduleMutex sync.Mutex + currentWasmModuleRoot common.Hash + pendingWasmModuleRoot common.Hash + // for testing only testingProgressMadeChan chan struct{} @@ -84,10 +92,9 @@ type BlockValidator struct { type BlockValidatorConfig struct { Enable bool `koanf:"enable"` - ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"` RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"` - ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs" reload:"hot"` - ExecutionServerConfig rpcclient.ClientConfig `koanf:"execution-server-config" reload:"hot"` + ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"` + ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"` ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"` PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"` ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"` @@ -96,7 +103,7 @@ type BlockValidatorConfig struct { FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"` - ValidationServerConfigsList string `koanf:"validation-server-configs-list" reload:"hot"` + ValidationServerConfigsList string `koanf:"validation-server-configs-list"` memoryFreeLimit int } @@ -112,27 +119,21 @@ func (c *BlockValidatorConfig) Validate() error { c.memoryFreeLimit = limit } streamsEnabled := c.RedisValidationClientConfig.Enabled() - if c.ValidationServerConfigs == nil { + if len(c.ValidationServerConfigs) == 0 { c.ValidationServerConfigs = []rpcclient.ClientConfig{c.ValidationServer} if c.ValidationServerConfigsList != "default" { - var validationServersConfigs []rpcclient.ClientConfig - if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &validationServersConfigs); err != nil && !streamsEnabled { + var executionServersConfigs []rpcclient.ClientConfig + if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &executionServersConfigs); err != nil && !streamsEnabled { return fmt.Errorf("failed to parse block-validator validation-server-configs-list string: %w", err) } - c.ValidationServerConfigs = validationServersConfigs + c.ValidationServerConfigs = executionServersConfigs } } - if len(c.ValidationServerConfigs) == 0 && !streamsEnabled { - return fmt.Errorf("block-validator validation-server-configs is empty, need at least one validation server config") - } - for _, serverConfig := range c.ValidationServerConfigs { - if err := serverConfig.Validate(); err != nil { - return fmt.Errorf("failed to validate one of the block-validator validation-server-configs. url: %s, err: %w", serverConfig.URL, err) + for i := range c.ValidationServerConfigs { + if err := c.ValidationServerConfigs[i].Validate(); err != nil { + return fmt.Errorf("failed to validate one of the block-validator validation-server-configs. url: %s, err: %w", c.ValidationServerConfigs[i].URL, err) } } - if err := c.ExecutionServerConfig.Validate(); err != nil { - return fmt.Errorf("validating execution server config: %w", err) - } return nil } @@ -145,9 +146,8 @@ type BlockValidatorConfigFetcher func() *BlockValidatorConfig func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation") rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer) - rpcclient.RPCClientAddOptions(prefix+".execution-server-config", f, &DefaultBlockValidatorConfig.ExecutionServerConfig) redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f) - f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of validation rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds") + f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds") f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations") f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)") f.Uint64(prefix+".prerecorded-blocks", DefaultBlockValidatorConfig.PrerecordedBlocks, "record that many blocks ahead of validation (larger footprint)") @@ -166,7 +166,6 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{ Enable: false, ValidationServerConfigsList: "default", ValidationServer: rpcclient.DefaultClientConfig, - ExecutionServerConfig: rpcclient.DefaultClientConfig, RedisValidationClientConfig: redis.DefaultValidationClientConfig, ValidationPoll: time.Second, ForwardBlocks: 1024, @@ -183,7 +182,6 @@ var TestBlockValidatorConfig = BlockValidatorConfig{ ValidationServer: rpcclient.TestClientConfig, ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig}, RedisValidationClientConfig: redis.TestValidationClientConfig, - ExecutionServerConfig: rpcclient.TestClientConfig, ValidationPoll: 100 * time.Millisecond, ForwardBlocks: 128, PrerecordedBlocks: uint64(2 * runtime.NumCPU()), @@ -332,6 +330,17 @@ func nonBlockingTrigger(channel chan struct{}) { } } +func (v *BlockValidator) GetModuleRootsToValidate() []common.Hash { + v.moduleMutex.Lock() + defer v.moduleMutex.Unlock() + + validatingModuleRoots := []common.Hash{v.currentWasmModuleRoot} + if v.currentWasmModuleRoot != v.pendingWasmModuleRoot && v.pendingWasmModuleRoot != (common.Hash{}) { + validatingModuleRoots = append(validatingModuleRoots, v.pendingWasmModuleRoot) + } + return validatingModuleRoots +} + // called from NewBlockValidator, doesn't need to catch locks func ReadLastValidatedInfo(db ethdb.Database) (*GlobalStateValidatedInfo, error) { exists, err := db.Has(lastGlobalStateValidatedInfoKey) @@ -460,8 +469,13 @@ func (v *BlockValidator) writeToFile(validationEntry *validationEntry, moduleRoo if err != nil { return err } - _, err = v.execSpawner.WriteToFile(input, validationEntry.End, moduleRoot).Await(v.GetContext()) - return err + for _, spawner := range v.execSpawners { + if validator.SpawnerSupportsModule(spawner, moduleRoot) { + _, err = spawner.WriteToFile(input, validationEntry.End, moduleRoot).Await(v.GetContext()) + return err + } + } + return errors.New("did not find exec spawner for wasmModuleRoot") } func (v *BlockValidator) SetCurrentWasmModuleRoot(hash common.Hash) error { @@ -704,14 +718,6 @@ func (v *BlockValidator) advanceValidations(ctx context.Context) (*arbutil.Messa defer v.reorgMutex.RUnlock() wasmRoots := v.GetModuleRootsToValidate() - rooms := make([]int, len(v.validationSpawners)) - currentSpawnerIndex := 0 - for i, spawner := range v.validationSpawners { - here := spawner.Room() / len(wasmRoots) - if here > 0 { - rooms[i] = here - } - } pos := v.validated() - 1 // to reverse the first +1 in the loop validationsLoop: for { @@ -780,15 +786,15 @@ validationsLoop: log.Trace("result validated", "count", v.validated(), "blockHash", v.lastValidGS.BlockHash) continue } - for currentSpawnerIndex < len(rooms) { - if rooms[currentSpawnerIndex] > 0 { - break + for _, moduleRoot := range wasmRoots { + if v.chosenValidator[moduleRoot] == nil { + v.possiblyFatal(fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot)) + continue + } + if v.chosenValidator[moduleRoot].Room() == 0 { + log.Trace("advanceValidations: no more room", "moduleRoot", moduleRoot) + return nil, nil } - currentSpawnerIndex++ - } - if currentSpawnerIndex == len(rooms) { - log.Trace("advanceValidations: no more room", "pos", pos) - return nil, nil } if v.isMemoryLimitExceeded() { log.Warn("advanceValidations: aborting due to running low on memory") @@ -808,8 +814,8 @@ validationsLoop: defer validatorPendingValidationsGauge.Dec(1) var runs []validator.ValidationRun for _, moduleRoot := range wasmRoots { - run := v.validationSpawners[currentSpawnerIndex].Launch(input, moduleRoot) - log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot, "spawner", currentSpawnerIndex) + run := v.chosenValidator[moduleRoot].Launch(input, moduleRoot) + log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot) runs = append(runs, run) } validationCtx, cancel := context.WithCancel(ctx) @@ -832,10 +838,6 @@ validationsLoop: } nonBlockingTrigger(v.progressValidationsChan) }) - rooms[currentSpawnerIndex]-- - if rooms[currentSpawnerIndex] == 0 { - currentSpawnerIndex++ - } } } } @@ -1045,10 +1047,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error { currentModuleRoot := config.CurrentModuleRoot switch currentModuleRoot { case "latest": - if v.execSpawner == nil { - return fmt.Errorf(`execution spawner is nil while current module root is "latest"`) - } - latest, err := v.execSpawner.LatestWasmModuleRoot().Await(ctx) + latest, err := v.GetLatestWasmModuleRoot(ctx) if err != nil { return err } @@ -1063,13 +1062,46 @@ func (v *BlockValidator) Initialize(ctx context.Context) error { return errors.New("current-module-root config value illegal") } } + pendingModuleRoot := config.PendingUpgradeModuleRoot + if pendingModuleRoot != "" { + if pendingModuleRoot == "latest" { + latest, err := v.GetLatestWasmModuleRoot(ctx) + if err != nil { + return err + } + v.pendingWasmModuleRoot = latest + } else { + valid, _ := regexp.MatchString("(0x)?[0-9a-fA-F]{64}", pendingModuleRoot) + v.pendingWasmModuleRoot = common.HexToHash(pendingModuleRoot) + if (!valid || v.pendingWasmModuleRoot == common.Hash{}) { + return errors.New("pending-upgrade-module-root config value illegal") + } + } + } log.Info("BlockValidator initialized", "current", v.currentWasmModuleRoot, "pending", v.pendingWasmModuleRoot) moduleRoots := []common.Hash{v.currentWasmModuleRoot} - if v.pendingWasmModuleRoot != v.currentWasmModuleRoot { + if v.pendingWasmModuleRoot != v.currentWasmModuleRoot && v.pendingWasmModuleRoot != (common.Hash{}) { moduleRoots = append(moduleRoots, v.pendingWasmModuleRoot) } - if err := v.StatelessBlockValidator.Initialize(moduleRoots); err != nil { - return fmt.Errorf("initializing block validator with module roots: %w", err) + // First spawner is always RedisValidationClient if RedisStreams are enabled. + if v.redisValidator != nil { + err := v.redisValidator.Initialize(moduleRoots) + if err != nil { + return err + } + } + v.chosenValidator = make(map[common.Hash]validator.ValidationSpawner) + for _, root := range moduleRoots { + if v.redisValidator != nil && validator.SpawnerSupportsModule(v.redisValidator, root) { + v.chosenValidator[root] = v.redisValidator + } else { + for _, spawner := range v.execSpawners { + if validator.SpawnerSupportsModule(spawner, root) { + v.chosenValidator[root] = spawner + break + } + } + } } return nil } diff --git a/staker/challenge_manager.go b/staker/challenge_manager.go index ac2ae8835a..22897e3c1d 100644 --- a/staker/challenge_manager.go +++ b/staker/challenge_manager.go @@ -478,9 +478,18 @@ func (m *ChallengeManager) createExecutionBackend(ctx context.Context, step uint } } input.BatchInfo = prunedBatches - execRun, err := m.validator.execSpawner.CreateExecutionRun(m.wasmModuleRoot, input).Await(ctx) - if err != nil { - return fmt.Errorf("error creating execution backend for msg %v: %w", initialCount, err) + var execRun validator.ExecutionRun + for _, spawner := range m.validator.execSpawners { + if validator.SpawnerSupportsModule(spawner, m.wasmModuleRoot) { + execRun, err = spawner.CreateExecutionRun(m.wasmModuleRoot, input).Await(ctx) + if err != nil { + return fmt.Errorf("error creating execution backend for msg %v: %w", initialCount, err) + } + break + } + } + if execRun == nil { + return fmt.Errorf("did not find valid execution backend") } backend, err := NewExecutionChallengeBackend(execRun) if err != nil { diff --git a/staker/challenge_test.go b/staker/challenge_test.go index f74e18b63d..168f76f300 100644 --- a/staker/challenge_test.go +++ b/staker/challenge_test.go @@ -193,6 +193,7 @@ func runChallengeTest( for i := 0; i < 100; i++ { if testTimeout { + backend.Commit() err = backend.AdjustTime(time.Second * 40) } Require(t, err) diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index f8e30329a7..48c638f114 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -7,8 +7,6 @@ import ( "context" "errors" "fmt" - "regexp" - "sync" "testing" "github.com/ethereum/go-ethereum/common" @@ -30,8 +28,8 @@ import ( type StatelessBlockValidator struct { config *BlockValidatorConfig - execSpawner validator.ExecutionSpawner - validationSpawners []validator.ValidationSpawner + execSpawners []validator.ExecutionSpawner + redisValidator *redis.ValidationClient recorder execution.ExecutionRecorder @@ -41,10 +39,6 @@ type StatelessBlockValidator struct { db ethdb.Database daService arbstate.DataAvailabilityReader blobReader arbstate.BlobReader - - moduleMutex sync.Mutex - currentWasmModuleRoot common.Hash - pendingWasmModuleRoot common.Hash } type BlockValidatorRegistrer interface { @@ -195,60 +189,41 @@ func NewStatelessBlockValidator( config func() *BlockValidatorConfig, stack *node.Node, ) (*StatelessBlockValidator, error) { - var validationSpawners []validator.ValidationSpawner + var executionSpawners []validator.ExecutionSpawner + var redisValClient *redis.ValidationClient + if config().RedisValidationClientConfig.Enabled() { - redisValClient, err := redis.NewValidationClient(&config().RedisValidationClientConfig) + var err error + redisValClient, err = redis.NewValidationClient(&config().RedisValidationClientConfig) if err != nil { return nil, fmt.Errorf("creating new redis validation client: %w", err) } - validationSpawners = append(validationSpawners, redisValClient) } - for _, serverConfig := range config().ValidationServerConfigs { - valConfFetcher := func() *rpcclient.ClientConfig { return &serverConfig } - validationSpawners = append(validationSpawners, validatorclient.NewValidationClient(valConfFetcher, stack)) + configs := config().ValidationServerConfigs + for i := range configs { + i := i + confFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[i] } + executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, stack)) } - valConfFetcher := func() *rpcclient.ClientConfig { - return &config().ExecutionServerConfig + if len(executionSpawners) == 0 { + return nil, errors.New("no enabled execution servers") } + return &StatelessBlockValidator{ - config: config(), - recorder: recorder, - validationSpawners: validationSpawners, - inboxReader: inboxReader, - inboxTracker: inbox, - streamer: streamer, - db: arbdb, - daService: das, - blobReader: blobReader, - execSpawner: validatorclient.NewExecutionClient(valConfFetcher, stack), + config: config(), + recorder: recorder, + redisValidator: redisValClient, + inboxReader: inboxReader, + inboxTracker: inbox, + streamer: streamer, + db: arbdb, + daService: das, + blobReader: blobReader, + execSpawners: executionSpawners, }, nil } -func (v *StatelessBlockValidator) Initialize(moduleRoots []common.Hash) error { - if len(v.validationSpawners) == 0 { - return nil - } - // First spawner is always RedisValidationClient if RedisStreams are enabled. - if v, ok := v.validationSpawners[0].(*redis.ValidationClient); ok { - if err := v.Initialize(moduleRoots); err != nil { - return fmt.Errorf("initializing redis validation client module roots: %w", err) - } - } - return nil -} - -func (v *StatelessBlockValidator) GetModuleRootsToValidate() []common.Hash { - v.moduleMutex.Lock() - defer v.moduleMutex.Unlock() - - validatingModuleRoots := []common.Hash{v.currentWasmModuleRoot} - if (v.currentWasmModuleRoot != v.pendingWasmModuleRoot && v.pendingWasmModuleRoot != common.Hash{}) { - validatingModuleRoots = append(validatingModuleRoots, v.pendingWasmModuleRoot) - } - return validatingModuleRoots -} - func (v *StatelessBlockValidator) ValidationEntryRecord(ctx context.Context, e *validationEntry) error { if e.Stage != ReadyForRecord { return fmt.Errorf("validation entry should be ReadyForRecord, is: %v", e.Stage) @@ -406,30 +381,29 @@ func (v *StatelessBlockValidator) ValidateResult( if err != nil { return false, nil, err } - var spawners []validator.ValidationSpawner - if useExec { - spawners = append(spawners, v.execSpawner) - } else { - spawners = v.validationSpawners + var run validator.ValidationRun + if !useExec { + if v.redisValidator != nil { + if validator.SpawnerSupportsModule(v.redisValidator, moduleRoot) { + run = v.redisValidator.Launch(input, moduleRoot) + } + } } - if len(spawners) == 0 { - return false, &entry.End, errors.New("no validation defined") + if run == nil { + for _, spawner := range v.execSpawners { + if validator.SpawnerSupportsModule(spawner, moduleRoot) { + run = spawner.Launch(input, moduleRoot) + break + } + } } - var runs []validator.ValidationRun - for _, spawner := range spawners { - run := spawner.Launch(input, moduleRoot) - runs = append(runs, run) + if run == nil { + return false, nil, fmt.Errorf("validation woth WasmModuleRoot %v not supported by node", moduleRoot) } - defer func() { - for _, run := range runs { - run.Cancel() - } - }() - for _, run := range runs { - gsEnd, err := run.Await(ctx) - if err != nil || gsEnd != entry.End { - return false, &gsEnd, err - } + defer run.Cancel() + gsEnd, err := run.Await(ctx) + if err != nil || gsEnd != entry.End { + return false, &gsEnd, err } return true, &entry.End, nil } @@ -438,36 +412,40 @@ func (v *StatelessBlockValidator) OverrideRecorder(t *testing.T, recorder execut v.recorder = recorder } -func (v *StatelessBlockValidator) Start(ctx_in context.Context) error { - for _, spawner := range v.validationSpawners { - if err := spawner.Start(ctx_in); err != nil { - return fmt.Errorf("starting validation spawner: %w", err) +func (v *StatelessBlockValidator) GetLatestWasmModuleRoot(ctx context.Context) (common.Hash, error) { + var lastErr error + for _, spawner := range v.execSpawners { + var latest common.Hash + latest, lastErr = spawner.LatestWasmModuleRoot().Await(ctx) + if latest != (common.Hash{}) && lastErr == nil { + return latest, nil + } + if ctx.Err() != nil { + return common.Hash{}, ctx.Err() } } - if err := v.execSpawner.Start(ctx_in); err != nil { - return fmt.Errorf("starting execution spawner: %w", err) + return common.Hash{}, fmt.Errorf("couldn't detect latest WasmModuleRoot: %w", lastErr) +} + +func (v *StatelessBlockValidator) Start(ctx_in context.Context) error { + if v.redisValidator != nil { + if err := v.redisValidator.Start(ctx_in); err != nil { + return fmt.Errorf("starting execution spawner: %w", err) + } } - if v.config.PendingUpgradeModuleRoot != "" { - if v.config.PendingUpgradeModuleRoot == "latest" { - latest, err := v.execSpawner.LatestWasmModuleRoot().Await(ctx_in) - if err != nil { - return fmt.Errorf("getting latest wasm module root: %w", err) - } - v.pendingWasmModuleRoot = latest - } else { - valid, _ := regexp.MatchString("(0x)?[0-9a-fA-F]{64}", v.config.PendingUpgradeModuleRoot) - v.pendingWasmModuleRoot = common.HexToHash(v.config.PendingUpgradeModuleRoot) - if (!valid || v.pendingWasmModuleRoot == common.Hash{}) { - return errors.New("pending-upgrade-module-root config value illegal") - } + for _, spawner := range v.execSpawners { + if err := spawner.Start(ctx_in); err != nil { + return err } } return nil } func (v *StatelessBlockValidator) Stop() { - v.execSpawner.Stop() - for _, spawner := range v.validationSpawners { + for _, spawner := range v.execSpawners { spawner.Stop() } + if v.redisValidator != nil { + v.redisValidator.Stop() + } } diff --git a/system_tests/block_validator_test.go b/system_tests/block_validator_test.go index c64fe22f54..dfd892a079 100644 --- a/system_tests/block_validator_test.go +++ b/system_tests/block_validator_test.go @@ -74,7 +74,6 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops redisURL = redisutil.CreateTestRedis(ctx, t) validatorConfig.BlockValidator.RedisValidationClientConfig = redis.DefaultValidationClientConfig validatorConfig.BlockValidator.RedisValidationClientConfig.RedisURL = redisURL - validatorConfig.BlockValidator.ValidationServerConfigs = nil } AddDefaultValNode(t, ctx, validatorConfig, !arbitrator, redisURL) diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 8d783c4564..ad9c6fbdf2 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -586,12 +586,8 @@ func StaticFetcherFrom[T any](t *testing.T, config *T) func() *T { } func configByValidationNode(clientConfig *arbnode.Config, valStack *node.Node) { - clientConfig.BlockValidator.ExecutionServerConfig.URL = valStack.WSEndpoint() - clientConfig.BlockValidator.ExecutionServerConfig.JWTSecret = "" - if len(clientConfig.BlockValidator.ValidationServerConfigs) != 0 { - clientConfig.BlockValidator.ValidationServerConfigs[0].URL = valStack.WSEndpoint() - clientConfig.BlockValidator.ValidationServerConfigs[0].JWTSecret = "" - } + clientConfig.BlockValidator.ValidationServerConfigs[0].URL = valStack.WSEndpoint() + clientConfig.BlockValidator.ValidationServerConfigs[0].JWTSecret = "" } func currentRootModule(t *testing.T) common.Hash { diff --git a/system_tests/full_challenge_impl_test.go b/system_tests/full_challenge_impl_test.go index eec274a915..197ea1a59f 100644 --- a/system_tests/full_challenge_impl_test.go +++ b/system_tests/full_challenge_impl_test.go @@ -341,7 +341,7 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall } var wasmModuleRoot common.Hash if useStubs { - wasmModuleRoot = mockWasmModuleRoot + wasmModuleRoot = mockWasmModuleRoots[0] } else { wasmModuleRoot = locator.LatestWasmModuleRoot() if (wasmModuleRoot == common.Hash{}) { diff --git a/system_tests/validation_mock_test.go b/system_tests/validation_mock_test.go index 788dfc5d7a..8f36e84f31 100644 --- a/system_tests/validation_mock_test.go +++ b/system_tests/validation_mock_test.go @@ -55,6 +55,10 @@ func globalstateToTestPreimages(gs validator.GoGlobalState) map[common.Hash][]by return preimages } +func (s *mockSpawner) WasmModuleRoots() ([]common.Hash, error) { + return mockWasmModuleRoots, nil +} + func (s *mockSpawner) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { run := &mockValRun{ Promise: containers.NewPromise[validator.GoGlobalState](nil), @@ -65,7 +69,7 @@ func (s *mockSpawner) Launch(entry *validator.ValidationInput, moduleRoot common return run } -var mockWasmModuleRoot common.Hash = common.HexToHash("0xa5a5a5") +var mockWasmModuleRoots []common.Hash = []common.Hash{common.HexToHash("0xa5a5a5"), common.HexToHash("0x1212")} func (s *mockSpawner) Start(context.Context) error { return nil @@ -83,7 +87,7 @@ func (s *mockSpawner) CreateExecutionRun(wasmModuleRoot common.Hash, input *vali } func (s *mockSpawner) LatestWasmModuleRoot() containers.PromiseInterface[common.Hash] { - return containers.NewReadyPromise[common.Hash](mockWasmModuleRoot, nil) + return containers.NewReadyPromise[common.Hash](mockWasmModuleRoots[0], nil) } func (s *mockSpawner) WriteToFile(input *validator.ValidationInput, expOut validator.GoGlobalState, moduleRoot common.Hash) containers.PromiseInterface[struct{}] { @@ -193,10 +197,21 @@ func TestValidationServerAPI(t *testing.T) { wasmRoot, err := client.LatestWasmModuleRoot().Await(ctx) Require(t, err) - if wasmRoot != mockWasmModuleRoot { + if wasmRoot != mockWasmModuleRoots[0] { t.Error("unexpected mock wasmModuleRoot") } + roots, err := client.WasmModuleRoots() + Require(t, err) + if len(roots) != len(mockWasmModuleRoots) { + Fatal(t, "wrong number of wasmModuleRoots", len(roots)) + } + for i := range roots { + if roots[i] != mockWasmModuleRoots[i] { + Fatal(t, "unexpected root", roots[i], mockWasmModuleRoots[i]) + } + } + hash1 := common.HexToHash("0x11223344556677889900aabbccddeeff") hash2 := common.HexToHash("0x11111111122222223333333444444444") diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index da184e3c16..1055d93968 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -58,6 +58,7 @@ type ValidationClient struct { producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState] producerConfig pubsub.ProducerConfig redisClient redis.UniversalClient + moduleRoots []common.Hash } func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) { @@ -86,14 +87,20 @@ func (c *ValidationClient) Initialize(moduleRoots []common.Hash) error { p, err := pubsub.NewProducer[*validator.ValidationInput, validator.GoGlobalState]( c.redisClient, server_api.RedisStreamForRoot(mr), &c.producerConfig) if err != nil { - return fmt.Errorf("creating producer for validation: %w", err) + log.Warn("failed init redis for %v: %w", mr, err) + continue } p.Start(c.GetContext()) c.producers[mr] = p + c.moduleRoots = append(c.moduleRoots, mr) } return nil } +func (c *ValidationClient) WasmModuleRoots() ([]common.Hash, error) { + return c.moduleRoots, nil +} + func (c *ValidationClient) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { atomic.AddInt32(&c.room, -1) defer atomic.AddInt32(&c.room, 1) diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index 24e51230d6..4ec9986b1e 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -4,6 +4,7 @@ import ( "context" "encoding/base64" "errors" + "fmt" "sync/atomic" "time" @@ -25,9 +26,10 @@ import ( type ValidationClient struct { stopwaiter.StopWaiter - client *rpcclient.RpcClient - name string - room int32 + client *rpcclient.RpcClient + name string + room int32 + wasmModuleRoots []common.Hash } func NewValidationClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ValidationClient { @@ -61,6 +63,13 @@ func (c *ValidationClient) Start(ctx_in context.Context) error { if len(name) == 0 { return errors.New("couldn't read name from server") } + var moduleRoots []common.Hash + if err := c.client.CallContext(c.GetContext(), &moduleRoots, server_api.Namespace+"_wasmModuleRoots"); err != nil { + return err + } + if len(moduleRoots) == 0 { + return fmt.Errorf("server reported no wasmModuleRoots") + } var room int if err := c.client.CallContext(c.GetContext(), &room, server_api.Namespace+"_room"); err != nil { return err @@ -72,10 +81,18 @@ func (c *ValidationClient) Start(ctx_in context.Context) error { log.Info("connected to validation server", "name", name, "room", room) } atomic.StoreInt32(&c.room, int32(room)) + c.wasmModuleRoots = moduleRoots c.name = name return nil } +func (c *ValidationClient) WasmModuleRoots() ([]common.Hash, error) { + if c.Started() { + return c.wasmModuleRoots, nil + } + return nil, errors.New("not started") +} + func (c *ValidationClient) Stop() { c.StopWaiter.StopOnly() if c.client != nil { diff --git a/validator/interface.go b/validator/interface.go index 5785ac4de1..0324b996ed 100644 --- a/validator/interface.go +++ b/validator/interface.go @@ -9,6 +9,7 @@ import ( type ValidationSpawner interface { Launch(entry *ValidationInput, moduleRoot common.Hash) ValidationRun + WasmModuleRoots() ([]common.Hash, error) Start(context.Context) error Stop() Name() string diff --git a/validator/server_arb/validator_spawner.go b/validator/server_arb/validator_spawner.go index e315b6a7fb..d745071019 100644 --- a/validator/server_arb/validator_spawner.go +++ b/validator/server_arb/validator_spawner.go @@ -84,6 +84,10 @@ func (s *ArbitratorSpawner) LatestWasmModuleRoot() containers.PromiseInterface[c return containers.NewReadyPromise(s.locator.LatestWasmModuleRoot(), nil) } +func (s *ArbitratorSpawner) WasmModuleRoots() ([]common.Hash, error) { + return s.locator.ModuleRoots(), nil +} + func (s *ArbitratorSpawner) Name() string { return "arbitrator" } diff --git a/validator/server_common/machine_locator.go b/validator/server_common/machine_locator.go index c8b4d9a165..28093c30f0 100644 --- a/validator/server_common/machine_locator.go +++ b/validator/server_common/machine_locator.go @@ -71,7 +71,7 @@ func NewMachineLocator(rootPath string) (*MachineLocator, error) { } for _, file := range files { mrFile := filepath.Join(dir, file.Name(), "module-root.txt") - if _, err := os.Stat(mrFile); errors.Is(err, os.ErrNotExist) { + if _, err := os.Stat(mrFile); err != nil { // Skip if module-roots file does not exist. continue } @@ -87,8 +87,11 @@ func NewMachineLocator(rootPath string) (*MachineLocator, error) { moduleRoots[moduleRoot] = true if file.Name() == "latest" { latestModuleRoot = moduleRoot - rootPath = dir } + rootPath = dir + } + if rootPath != "" { + break } } var roots []common.Hash diff --git a/validator/server_jit/spawner.go b/validator/server_jit/spawner.go index 6489821b5b..703e761af5 100644 --- a/validator/server_jit/spawner.go +++ b/validator/server_jit/spawner.go @@ -67,6 +67,10 @@ func (v *JitSpawner) Start(ctx_in context.Context) error { return nil } +func (v *JitSpawner) WasmModuleRoots() ([]common.Hash, error) { + return v.locator.ModuleRoots(), nil +} + func (v *JitSpawner) execute( ctx context.Context, entry *validator.ValidationInput, moduleRoot common.Hash, ) (validator.GoGlobalState, error) { diff --git a/validator/utils.go b/validator/utils.go new file mode 100644 index 0000000000..4c8ae65d08 --- /dev/null +++ b/validator/utils.go @@ -0,0 +1,20 @@ +package validator + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" +) + +func SpawnerSupportsModule(spawner ValidationSpawner, requested common.Hash) bool { + supported, err := spawner.WasmModuleRoots() + if err != nil { + log.Warn("WasmModuleRoots returned error", "err", err) + return false + } + for _, root := range supported { + if root == requested { + return true + } + } + return false +} diff --git a/validator/valnode/validation_api.go b/validator/valnode/validation_api.go index 432e5eedd9..f2c24689f8 100644 --- a/validator/valnode/validation_api.go +++ b/validator/valnode/validation_api.go @@ -38,6 +38,10 @@ func (a *ValidationServerAPI) Validate(ctx context.Context, entry *server_api.In return valRun.Await(ctx) } +func (a *ValidationServerAPI) WasmModuleRoots() ([]common.Hash, error) { + return a.spawner.WasmModuleRoots() +} + func NewValidationServerAPI(spawner validator.ValidationSpawner) *ValidationServerAPI { return &ValidationServerAPI{spawner} }