diff --git a/pubsub/common.go b/pubsub/common.go new file mode 100644 index 0000000000..2aefa02c84 --- /dev/null +++ b/pubsub/common.go @@ -0,0 +1,17 @@ +package pubsub + +import ( + "context" + + "github.com/go-redis/redis/v8" +) + +// CreateStream tries to create stream with given name, if it already exists +// does not return an error. +func CreateStream(ctx context.Context, streamName string, client redis.UniversalClient) error { + _, err := client.XGroupCreateMkStream(ctx, streamName, streamName, "$").Result() + if err == nil || err.Error() == "BUSYGROUP Consumer Group name already exists" { + return nil + } + return err +} diff --git a/staker/block_validator.go b/staker/block_validator.go index e494b3da10..d9126a27f8 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -1088,7 +1088,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error { } // First spawner is always RedisValidationClient if RedisStreams are enabled. if v.redisValidator != nil { - err := v.redisValidator.Initialize(moduleRoots) + err := v.redisValidator.Initialize(ctx, moduleRoots) if err != nil { return err } diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index 1055d93968..c971664bd3 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -23,6 +23,7 @@ type ValidationClientConfig struct { Room int32 `koanf:"room"` RedisURL string `koanf:"redis-url"` ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"` + CreateStreams bool `koanf:"create-streams"` } func (c ValidationClientConfig) Enabled() bool { @@ -34,6 +35,7 @@ var DefaultValidationClientConfig = ValidationClientConfig{ Room: 2, RedisURL: "", ProducerConfig: pubsub.DefaultProducerConfig, + CreateStreams: true, } var TestValidationClientConfig = ValidationClientConfig{ @@ -41,12 +43,14 @@ var TestValidationClientConfig = ValidationClientConfig{ Room: 2, RedisURL: "", ProducerConfig: pubsub.TestProducerConfig, + CreateStreams: true, } func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".name", DefaultValidationClientConfig.Name, "validation client name") f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room") pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f) + f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist") } // ValidationClient implements validation client through redis streams. @@ -78,8 +82,11 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) }, nil } -func (c *ValidationClient) Initialize(moduleRoots []common.Hash) error { +func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { for _, mr := range moduleRoots { + if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil { + return fmt.Errorf("creating redis stream: %w", err) + } if _, exists := c.producers[mr]; exists { log.Warn("Producer already existsw for module root", "hash", mr) continue diff --git a/validator/server_common/machine_locator.go b/validator/server_common/machine_locator.go index 28093c30f0..71f6af60b6 100644 --- a/validator/server_common/machine_locator.go +++ b/validator/server_common/machine_locator.go @@ -58,7 +58,7 @@ func NewMachineLocator(rootPath string) (*MachineLocator, error) { for _, dir := range dirs { fInfo, err := os.Stat(dir) if err != nil { - log.Warn("Getting file info", "error", err) + log.Warn("Getting file info", "dir", dir, "error", err) continue } if !fInfo.IsDir() { diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 1cadaf7c9a..95d45589f3 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -3,10 +3,13 @@ package redis import ( "context" "fmt" + "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/go-redis/redis/v8" "github.com/offchainlabs/nitro/pubsub" "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/stopwaiter" @@ -42,12 +45,56 @@ func NewValidationServer(cfg *ValidationServerConfig, spawner validator.Validati } consumers[mr] = c } + var ( + wg sync.WaitGroup + initialized atomic.Bool + ) + initialized.Store(true) + for i := 0; i < len(cfg.ModuleRoots); i++ { + mr := cfg.ModuleRoots[i] + wg.Add(1) + go func() { + defer wg.Done() + done := waitForStream(redisClient, mr) + select { + case <-time.After(cfg.StreamTimeout): + initialized.Store(false) + return + case <-done: + return + } + }() + } + wg.Wait() + if !initialized.Load() { + return nil, fmt.Errorf("waiting for streams to be created: timed out") + } return &ValidationServer{ consumers: consumers, spawner: spawner, }, nil } +func streamExists(client redis.UniversalClient, streamName string) bool { + groups, err := client.XInfoStream(context.TODO(), streamName).Result() + if err != nil { + log.Error("Reading redis streams", "error", err) + return false + } + return groups.Groups > 0 +} + +func waitForStream(client redis.UniversalClient, streamName string) chan struct{} { + var ret chan struct{} + go func() { + if streamExists(client, streamName) { + ret <- struct{}{} + } + time.Sleep(time.Millisecond * 100) + }() + return ret +} + func (s *ValidationServer) Start(ctx_in context.Context) { s.StopWaiter.Start(ctx_in, s) for moduleRoot, c := range s.consumers { @@ -83,6 +130,8 @@ type ValidationServerConfig struct { ConsumerConfig pubsub.ConsumerConfig `koanf:"consumer-config"` // Supported wasm module roots. ModuleRoots []string `koanf:"module-roots"` + // Timeout on polling for existence of each redis stream. + StreamTimeout time.Duration `koanf:"stream-timeout"` } var DefaultValidationServerConfig = ValidationServerConfig{