diff --git a/pubsub/common.go b/pubsub/common.go new file mode 100644 index 0000000000..9f05304e46 --- /dev/null +++ b/pubsub/common.go @@ -0,0 +1,29 @@ +package pubsub + +import ( + "context" + + "github.com/ethereum/go-ethereum/log" + "github.com/go-redis/redis/v8" +) + +// CreateStream tries to create stream with given name, if it already exists +// does not return an error. +func CreateStream(ctx context.Context, streamName string, client redis.UniversalClient) error { + _, err := client.XGroupCreateMkStream(ctx, streamName, streamName, "$").Result() + if err != nil && !StreamExists(ctx, streamName, client) { + return err + } + return nil +} + +// StreamExists returns whether there are any consumer group for specified +// redis stream. +func StreamExists(ctx context.Context, streamName string, client redis.UniversalClient) bool { + got, err := client.Do(ctx, "XINFO", "STREAM", streamName).Result() + if err != nil { + log.Error("Reading redis streams", "error", err) + return false + } + return got != nil +} diff --git a/pubsub/consumer.go b/pubsub/consumer.go index c9590de8e6..df3695606d 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -86,6 +86,14 @@ func heartBeatKey(id string) string { return fmt.Sprintf("consumer:%s:heartbeat", id) } +func (c *Consumer[Request, Response]) RedisClient() redis.UniversalClient { + return c.client +} + +func (c *Consumer[Request, Response]) StreamName() string { + return c.redisStream +} + func (c *Consumer[Request, Response]) heartBeatKey() string { return heartBeatKey(c.id) } diff --git a/staker/block_validator.go b/staker/block_validator.go index 5a511920f2..0fea05469f 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -1089,7 +1089,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error { } // First spawner is always RedisValidationClient if RedisStreams are enabled. if v.redisValidator != nil { - err := v.redisValidator.Initialize(moduleRoots) + err := v.redisValidator.Initialize(ctx, moduleRoots) if err != nil { return err } diff --git a/system_tests/block_validator_test.go b/system_tests/block_validator_test.go index debd6d4c7c..54046edf15 100644 --- a/system_tests/block_validator_test.go +++ b/system_tests/block_validator_test.go @@ -74,6 +74,8 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops redisURL = redisutil.CreateTestRedis(ctx, t) validatorConfig.BlockValidator.RedisValidationClientConfig = redis.TestValidationClientConfig validatorConfig.BlockValidator.RedisValidationClientConfig.RedisURL = redisURL + } else { + validatorConfig.BlockValidator.RedisValidationClientConfig = redis.ValidationClientConfig{} } AddDefaultValNode(t, ctx, validatorConfig, !arbitrator, redisURL) diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index 1055d93968..41ae100954 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -23,6 +23,7 @@ type ValidationClientConfig struct { Room int32 `koanf:"room"` RedisURL string `koanf:"redis-url"` ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"` + CreateStreams bool `koanf:"create-streams"` } func (c ValidationClientConfig) Enabled() bool { @@ -34,6 +35,7 @@ var DefaultValidationClientConfig = ValidationClientConfig{ Room: 2, RedisURL: "", ProducerConfig: pubsub.DefaultProducerConfig, + CreateStreams: true, } var TestValidationClientConfig = ValidationClientConfig{ @@ -41,12 +43,14 @@ var TestValidationClientConfig = ValidationClientConfig{ Room: 2, RedisURL: "", ProducerConfig: pubsub.TestProducerConfig, + CreateStreams: false, } func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".name", DefaultValidationClientConfig.Name, "validation client name") f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room") pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f) + f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist") } // ValidationClient implements validation client through redis streams. @@ -59,6 +63,7 @@ type ValidationClient struct { producerConfig pubsub.ProducerConfig redisClient redis.UniversalClient moduleRoots []common.Hash + createStreams bool } func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) { @@ -75,11 +80,17 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), producerConfig: cfg.ProducerConfig, redisClient: redisClient, + createStreams: cfg.CreateStreams, }, nil } -func (c *ValidationClient) Initialize(moduleRoots []common.Hash) error { +func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { for _, mr := range moduleRoots { + if c.createStreams { + if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil { + return fmt.Errorf("creating redis stream: %w", err) + } + } if _, exists := c.producers[mr]; exists { log.Warn("Producer already existsw for module root", "hash", mr) continue diff --git a/validator/server_common/machine_locator.go b/validator/server_common/machine_locator.go index 28093c30f0..71f6af60b6 100644 --- a/validator/server_common/machine_locator.go +++ b/validator/server_common/machine_locator.go @@ -58,7 +58,7 @@ func NewMachineLocator(rootPath string) (*MachineLocator, error) { for _, dir := range dirs { fInfo, err := os.Stat(dir) if err != nil { - log.Warn("Getting file info", "error", err) + log.Warn("Getting file info", "dir", dir, "error", err) continue } if !fInfo.IsDir() { diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 1cadaf7c9a..3569e78b5c 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -22,7 +22,8 @@ type ValidationServer struct { spawner validator.ValidationSpawner // consumers stores moduleRoot to consumer mapping. - consumers map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState] + consumers map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState] + streamTimeout time.Duration } func NewValidationServer(cfg *ValidationServerConfig, spawner validator.ValidationSpawner) (*ValidationServer, error) { @@ -43,39 +44,83 @@ func NewValidationServer(cfg *ValidationServerConfig, spawner validator.Validati consumers[mr] = c } return &ValidationServer{ - consumers: consumers, - spawner: spawner, + consumers: consumers, + spawner: spawner, + streamTimeout: cfg.StreamTimeout, }, nil } func (s *ValidationServer) Start(ctx_in context.Context) { s.StopWaiter.Start(ctx_in, s) + // Channel that all consumers use to indicate their readiness. + readyStreams := make(chan struct{}, len(s.consumers)) for moduleRoot, c := range s.consumers { c := c + moduleRoot := moduleRoot c.Start(ctx_in) - s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { - req, err := c.Consume(ctx) - if err != nil { - log.Error("Consuming request", "error", err) - return 0 + // Channel for single consumer, once readiness is indicated in this, + // consumer will start consuming iteratively. + ready := make(chan struct{}, 1) + s.StopWaiter.LaunchThread(func(ctx context.Context) { + for { + if pubsub.StreamExists(ctx, c.StreamName(), c.RedisClient()) { + ready <- struct{}{} + readyStreams <- struct{}{} + return + } + select { + case <-ctx.Done(): + log.Info("Context done", "error", ctx.Err().Error()) + return + case <-time.After(time.Millisecond * 100): + } } - if req == nil { - // There's nothing in the queue. - return time.Second - } - valRun := s.spawner.Launch(req.Value, moduleRoot) - res, err := valRun.Await(ctx) - if err != nil { - log.Error("Error validating", "request value", req.Value, "error", err) - return 0 - } - if err := c.SetResult(ctx, req.ID, res); err != nil { - log.Error("Error setting result for request", "id", req.ID, "result", res, "error", err) - return 0 + }) + s.StopWaiter.LaunchThread(func(ctx context.Context) { + select { + case <-ctx.Done(): + log.Info("Context done", "error", ctx.Err().Error()) + return + case <-ready: // Wait until the stream exists and start consuming iteratively. } - return time.Second + s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { + req, err := c.Consume(ctx) + if err != nil { + log.Error("Consuming request", "error", err) + return 0 + } + if req == nil { + // There's nothing in the queue. + return time.Second + } + valRun := s.spawner.Launch(req.Value, moduleRoot) + res, err := valRun.Await(ctx) + if err != nil { + log.Error("Error validating", "request value", req.Value, "error", err) + return 0 + } + if err := c.SetResult(ctx, req.ID, res); err != nil { + log.Error("Error setting result for request", "id", req.ID, "result", res, "error", err) + return 0 + } + return time.Second + }) }) } + s.StopWaiter.LaunchThread(func(ctx context.Context) { + for { + select { + case <-readyStreams: + log.Trace("At least one stream is ready") + return // Don't block Start if at least one of the stream is ready. + case <-time.After(s.streamTimeout): + log.Error("Waiting for redis streams timed out") + case <-ctx.Done(): + log.Info(("Context expired, failed to start")) + return + } + } + }) } type ValidationServerConfig struct { @@ -83,23 +128,28 @@ type ValidationServerConfig struct { ConsumerConfig pubsub.ConsumerConfig `koanf:"consumer-config"` // Supported wasm module roots. ModuleRoots []string `koanf:"module-roots"` + // Timeout on polling for existence of each redis stream. + StreamTimeout time.Duration `koanf:"stream-timeout"` } var DefaultValidationServerConfig = ValidationServerConfig{ RedisURL: "", ConsumerConfig: pubsub.DefaultConsumerConfig, ModuleRoots: []string{}, + StreamTimeout: 10 * time.Minute, } var TestValidationServerConfig = ValidationServerConfig{ RedisURL: "", ConsumerConfig: pubsub.TestConsumerConfig, ModuleRoots: []string{}, + StreamTimeout: time.Minute, } func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) { pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") + f.Duration(prefix+".stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") } func (cfg *ValidationServerConfig) Enabled() bool {