diff --git a/staker/block_validator.go b/staker/block_validator.go index b66fcea44c..1a601db8a9 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -134,9 +134,6 @@ func (c *BlockValidatorConfig) Validate() error { if err := c.ExecutionServerConfig.Validate(); err != nil { return fmt.Errorf("validating execution server config: %w", err) } - if err := c.RedisValidationClientConfig.Validate(); err != nil { - return fmt.Errorf("validating redis validation client configuration: %w", err) - } return nil } @@ -1068,6 +1065,9 @@ func (v *BlockValidator) Initialize(ctx context.Context) error { } } log.Info("BlockValidator initialized", "current", v.currentWasmModuleRoot, "pending", v.pendingWasmModuleRoot) + if err := v.StatelessBlockValidator.Initialize([]common.Hash{v.currentWasmModuleRoot, v.pendingWasmModuleRoot}); err != nil { + return fmt.Errorf("initializing block validator with module roots: %w", err) + } return nil } diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index 74b87f0291..4f71e39545 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -199,7 +199,6 @@ func NewStatelessBlockValidator( redisValClient, err := validatorclient.NewRedisValidationClient(&config().RedisValidationClientConfig) if err != nil { return nil, fmt.Errorf("creating new redis validation client: %w", err) - // log.Error("Creating redis validation client, redis validator disabled", "error", err) } validationSpawners = append(validationSpawners, redisValClient) } @@ -225,6 +224,19 @@ func NewStatelessBlockValidator( }, nil } +func (v *StatelessBlockValidator) Initialize(moduleRoots []common.Hash) error { + if len(v.validationSpawners) == 0 { + return nil + } + // First spawner is always RedisValidationClient if RedisStreams are enabled. + if v, ok := v.validationSpawners[0].(*validatorclient.RedisValidationClient); ok { + if err := v.Initialize(moduleRoots); err != nil { + return fmt.Errorf("initializing redis validation client module roots: %w", err) + } + } + return nil +} + func (v *StatelessBlockValidator) GetModuleRootsToValidate() []common.Hash { v.moduleMutex.Lock() defer v.moduleMutex.Unlock() diff --git a/system_tests/block_validator_test.go b/system_tests/block_validator_test.go index ed8438eb78..a7c85bf5e4 100644 --- a/system_tests/block_validator_test.go +++ b/system_tests/block_validator_test.go @@ -74,7 +74,6 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops if useRedisStreams { redisURL = redisutil.CreateTestRedis(ctx, t) validatorConfig.BlockValidator.RedisValidationClientConfig = validatorclient.DefaultRedisValidationClientConfig - validatorConfig.BlockValidator.RedisValidationClientConfig.ModuleRoots = []string{currentRootModule(t).Hex()} validatorConfig.BlockValidator.RedisValidationClientConfig.RedisURL = redisURL validatorConfig.BlockValidator.ValidationServerConfigs = nil } diff --git a/validator/client/redisproducer.go b/validator/client/redisproducer.go index bfc083daf9..07569d51b6 100644 --- a/validator/client/redisproducer.go +++ b/validator/client/redisproducer.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/go-redis/redis/v8" "github.com/offchainlabs/nitro/pubsub" "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/redisutil" @@ -22,43 +23,17 @@ type RedisValidationClientConfig struct { Room int32 `koanf:"room"` RedisURL string `koanf:"redis-url"` ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"` - // Supported wasm module roots, when the list is empty this is disabled. - ModuleRoots []string `koanf:"module-roots"` - moduleRoots []common.Hash } func (c RedisValidationClientConfig) Enabled() bool { return c.RedisURL != "" } -func (c *RedisValidationClientConfig) Validate() error { - m := make(map[string]bool) - // Add all moduleRoot hashes in case Validate is called twice so that we - // don't add duplicate moduleRoots again. - for _, mr := range c.moduleRoots { - m[mr.Hex()] = true - } - for _, mr := range c.ModuleRoots { - if _, exists := m[mr]; exists { - log.Warn("Duplicate module root", "hash", mr) - continue - } - h := common.HexToHash(mr) - if h == (common.Hash{}) { - return fmt.Errorf("invalid module root hash: %q", mr) - } - m[mr] = true - c.moduleRoots = append(c.moduleRoots, h) - } - return nil -} - var DefaultRedisValidationClientConfig = RedisValidationClientConfig{ Name: "redis validation client", Room: 2, RedisURL: "", ProducerConfig: pubsub.DefaultProducerConfig, - ModuleRoots: []string{}, } var TestRedisValidationClientConfig = RedisValidationClientConfig{ @@ -66,14 +41,12 @@ var TestRedisValidationClientConfig = RedisValidationClientConfig{ Room: 2, RedisURL: "", ProducerConfig: pubsub.TestProducerConfig, - ModuleRoots: []string{}, } func RedisValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".name", DefaultRedisValidationClientConfig.Name, "validation client name") f.Int32(prefix+".room", DefaultRedisValidationClientConfig.Room, "validation client room") pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f) - f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") } // RedisValidationClient implements validation client through redis streams. @@ -82,15 +55,12 @@ type RedisValidationClient struct { name string room int32 // producers stores moduleRoot to producer mapping. - producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState] + producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState] + producerConfig pubsub.ProducerConfig + redisClient redis.UniversalClient } func NewRedisValidationClient(cfg *RedisValidationClientConfig) (*RedisValidationClient, error) { - res := &RedisValidationClient{ - name: cfg.Name, - room: cfg.Room, - producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), - } if cfg.RedisURL == "" { return nil, fmt.Errorf("redis url cannot be empty") } @@ -98,18 +68,30 @@ func NewRedisValidationClient(cfg *RedisValidationClientConfig) (*RedisValidatio if err != nil { return nil, err } - if len(cfg.moduleRoots) == 0 { - return nil, fmt.Errorf("moduleRoots must be specified to enable redis streams") - } - for _, mr := range cfg.moduleRoots { + return &RedisValidationClient{ + name: cfg.Name, + room: cfg.Room, + producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), + producerConfig: cfg.ProducerConfig, + redisClient: redisClient, + }, nil +} + +func (c *RedisValidationClient) Initialize(moduleRoots []common.Hash) error { + for _, mr := range moduleRoots { + if _, exists := c.producers[mr]; exists { + log.Warn("Producer already existsw for module root", "hash", mr) + continue + } p, err := pubsub.NewProducer[*validator.ValidationInput, validator.GoGlobalState]( - redisClient, server_api.RedisStreamForRoot(mr), &cfg.ProducerConfig) + c.redisClient, server_api.RedisStreamForRoot(mr), &c.producerConfig) if err != nil { - return nil, fmt.Errorf("creating producer for validation: %w", err) + return fmt.Errorf("creating producer for validation: %w", err) } - res.producers[mr] = p + p.Start(c.GetContext()) + c.producers[mr] = p } - return res, nil + return nil } func (c *RedisValidationClient) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun {