From f48c25658b2d82f1adaae84400abae7c87df1483 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 23 May 2024 21:45:25 +0200 Subject: [PATCH] Address comments --- validator/valnode/redis/consumer.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 2fa25ef3c5..52c8728681 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -68,13 +68,18 @@ func (s *ValidationServer) Start(ctx_in context.Context) { readyStreams <- struct{}{} return } - time.Sleep(time.Millisecond * 100) + select { + case <-ctx.Done(): + log.Info("Context done", "error", ctx.Err().Error()) + return + case <-time.After(time.Millisecond * 100): + } } }) s.StopWaiter.LaunchThread(func(ctx context.Context) { select { case <-ctx.Done(): - log.Error("Context done", "error", ctx.Err().Error()) + log.Info("Context done", "error", ctx.Err().Error()) return case <-ready: // Wait until the stream exists and start consuming iteratively. } @@ -111,7 +116,7 @@ func (s *ValidationServer) Start(ctx_in context.Context) { case <-time.After(s.streamTimeout): log.Error("Waiting for redis streams timed out") case <-ctx_in.Done(): - log.Error(("Context expired, failed to start")) + log.Info(("Context expired, failed to start")) return } } @@ -130,17 +135,20 @@ var DefaultValidationServerConfig = ValidationServerConfig{ RedisURL: "", ConsumerConfig: pubsub.DefaultConsumerConfig, ModuleRoots: []string{}, + StreamTimeout: 10 * time.Minute, } var TestValidationServerConfig = ValidationServerConfig{ RedisURL: "", ConsumerConfig: pubsub.TestConsumerConfig, ModuleRoots: []string{}, + StreamTimeout: time.Minute, } func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) { pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") + f.Duration(prefix+"stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") } func (cfg *ValidationServerConfig) Enabled() bool {