From 761e8e2d8328dda454632f1d8a9a997505a42e56 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Wed, 15 May 2024 13:57:28 +0200 Subject: [PATCH 1/7] Create streams in redis client, poll on it in redis-server --- pubsub/common.go | 17 ++++++++ staker/block_validator.go | 2 +- validator/client/redis/producer.go | 9 +++- validator/server_common/machine_locator.go | 2 +- validator/valnode/redis/consumer.go | 49 ++++++++++++++++++++++ 5 files changed, 76 insertions(+), 3 deletions(-) create mode 100644 pubsub/common.go diff --git a/pubsub/common.go b/pubsub/common.go new file mode 100644 index 0000000000..2aefa02c84 --- /dev/null +++ b/pubsub/common.go @@ -0,0 +1,17 @@ +package pubsub + +import ( + "context" + + "github.com/go-redis/redis/v8" +) + +// CreateStream tries to create stream with given name, if it already exists +// does not return an error. +func CreateStream(ctx context.Context, streamName string, client redis.UniversalClient) error { + _, err := client.XGroupCreateMkStream(ctx, streamName, streamName, "$").Result() + if err == nil || err.Error() == "BUSYGROUP Consumer Group name already exists" { + return nil + } + return err +} diff --git a/staker/block_validator.go b/staker/block_validator.go index e494b3da10..d9126a27f8 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -1088,7 +1088,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error { } // First spawner is always RedisValidationClient if RedisStreams are enabled. if v.redisValidator != nil { - err := v.redisValidator.Initialize(moduleRoots) + err := v.redisValidator.Initialize(ctx, moduleRoots) if err != nil { return err } diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index 1055d93968..c971664bd3 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -23,6 +23,7 @@ type ValidationClientConfig struct { Room int32 `koanf:"room"` RedisURL string `koanf:"redis-url"` ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"` + CreateStreams bool `koanf:"create-streams"` } func (c ValidationClientConfig) Enabled() bool { @@ -34,6 +35,7 @@ var DefaultValidationClientConfig = ValidationClientConfig{ Room: 2, RedisURL: "", ProducerConfig: pubsub.DefaultProducerConfig, + CreateStreams: true, } var TestValidationClientConfig = ValidationClientConfig{ @@ -41,12 +43,14 @@ var TestValidationClientConfig = ValidationClientConfig{ Room: 2, RedisURL: "", ProducerConfig: pubsub.TestProducerConfig, + CreateStreams: true, } func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".name", DefaultValidationClientConfig.Name, "validation client name") f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room") pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f) + f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist") } // ValidationClient implements validation client through redis streams. @@ -78,8 +82,11 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) }, nil } -func (c *ValidationClient) Initialize(moduleRoots []common.Hash) error { +func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { for _, mr := range moduleRoots { + if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil { + return fmt.Errorf("creating redis stream: %w", err) + } if _, exists := c.producers[mr]; exists { log.Warn("Producer already existsw for module root", "hash", mr) continue diff --git a/validator/server_common/machine_locator.go b/validator/server_common/machine_locator.go index 28093c30f0..71f6af60b6 100644 --- a/validator/server_common/machine_locator.go +++ b/validator/server_common/machine_locator.go @@ -58,7 +58,7 @@ func NewMachineLocator(rootPath string) (*MachineLocator, error) { for _, dir := range dirs { fInfo, err := os.Stat(dir) if err != nil { - log.Warn("Getting file info", "error", err) + log.Warn("Getting file info", "dir", dir, "error", err) continue } if !fInfo.IsDir() { diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 1cadaf7c9a..95d45589f3 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -3,10 +3,13 @@ package redis import ( "context" "fmt" + "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/go-redis/redis/v8" "github.com/offchainlabs/nitro/pubsub" "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/stopwaiter" @@ -42,12 +45,56 @@ func NewValidationServer(cfg *ValidationServerConfig, spawner validator.Validati } consumers[mr] = c } + var ( + wg sync.WaitGroup + initialized atomic.Bool + ) + initialized.Store(true) + for i := 0; i < len(cfg.ModuleRoots); i++ { + mr := cfg.ModuleRoots[i] + wg.Add(1) + go func() { + defer wg.Done() + done := waitForStream(redisClient, mr) + select { + case <-time.After(cfg.StreamTimeout): + initialized.Store(false) + return + case <-done: + return + } + }() + } + wg.Wait() + if !initialized.Load() { + return nil, fmt.Errorf("waiting for streams to be created: timed out") + } return &ValidationServer{ consumers: consumers, spawner: spawner, }, nil } +func streamExists(client redis.UniversalClient, streamName string) bool { + groups, err := client.XInfoStream(context.TODO(), streamName).Result() + if err != nil { + log.Error("Reading redis streams", "error", err) + return false + } + return groups.Groups > 0 +} + +func waitForStream(client redis.UniversalClient, streamName string) chan struct{} { + var ret chan struct{} + go func() { + if streamExists(client, streamName) { + ret <- struct{}{} + } + time.Sleep(time.Millisecond * 100) + }() + return ret +} + func (s *ValidationServer) Start(ctx_in context.Context) { s.StopWaiter.Start(ctx_in, s) for moduleRoot, c := range s.consumers { @@ -83,6 +130,8 @@ type ValidationServerConfig struct { ConsumerConfig pubsub.ConsumerConfig `koanf:"consumer-config"` // Supported wasm module roots. ModuleRoots []string `koanf:"module-roots"` + // Timeout on polling for existence of each redis stream. + StreamTimeout time.Duration `koanf:"stream-timeout"` } var DefaultValidationServerConfig = ValidationServerConfig{ From 5e42b9b24c9d99bca2fe713505efea76e23940ff Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 17:15:40 +0200 Subject: [PATCH 2/7] Launch stream creation threads on Validation server start --- pubsub/common.go | 12 +++ pubsub/consumer.go | 8 ++ validator/valnode/redis/consumer.go | 125 +++++++++++++--------------- 3 files changed, 76 insertions(+), 69 deletions(-) diff --git a/pubsub/common.go b/pubsub/common.go index 2aefa02c84..bc0ab1035b 100644 --- a/pubsub/common.go +++ b/pubsub/common.go @@ -3,6 +3,7 @@ package pubsub import ( "context" + "github.com/ethereum/go-ethereum/log" "github.com/go-redis/redis/v8" ) @@ -15,3 +16,14 @@ func CreateStream(ctx context.Context, streamName string, client redis.Universal } return err } + +// StreamExists returns whether there are any consumer group for specified +// redis stream. +func StreamExists(ctx context.Context, client redis.UniversalClient, streamName string) bool { + groups, err := client.XInfoStream(ctx, streamName).Result() + if err != nil { + log.Error("Reading redis streams", "error", err) + return false + } + return groups.Groups > 0 +} diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 7a5078ee00..d7809b5f1b 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -86,6 +86,14 @@ func heartBeatKey(id string) string { return fmt.Sprintf("consumer:%s:heartbeat", id) } +func (c *Consumer[Request, Response]) RedisClient() redis.UniversalClient { + return c.client +} + +func (c *Consumer[Request, Response]) StreamName() string { + return c.redisStream +} + func (c *Consumer[Request, Response]) heartBeatKey() string { return heartBeatKey(c.id) } diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 95d45589f3..bc1cd289e7 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -3,13 +3,10 @@ package redis import ( "context" "fmt" - "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" - "github.com/go-redis/redis/v8" "github.com/offchainlabs/nitro/pubsub" "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/stopwaiter" @@ -25,7 +22,8 @@ type ValidationServer struct { spawner validator.ValidationSpawner // consumers stores moduleRoot to consumer mapping. - consumers map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState] + consumers map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState] + streamTimeout time.Duration } func NewValidationServer(cfg *ValidationServerConfig, spawner validator.ValidationSpawner) (*ValidationServer, error) { @@ -45,84 +43,73 @@ func NewValidationServer(cfg *ValidationServerConfig, spawner validator.Validati } consumers[mr] = c } - var ( - wg sync.WaitGroup - initialized atomic.Bool - ) - initialized.Store(true) - for i := 0; i < len(cfg.ModuleRoots); i++ { - mr := cfg.ModuleRoots[i] - wg.Add(1) - go func() { - defer wg.Done() - done := waitForStream(redisClient, mr) - select { - case <-time.After(cfg.StreamTimeout): - initialized.Store(false) - return - case <-done: - return - } - }() - } - wg.Wait() - if !initialized.Load() { - return nil, fmt.Errorf("waiting for streams to be created: timed out") - } return &ValidationServer{ - consumers: consumers, - spawner: spawner, + consumers: consumers, + spawner: spawner, + streamTimeout: cfg.StreamTimeout, }, nil } -func streamExists(client redis.UniversalClient, streamName string) bool { - groups, err := client.XInfoStream(context.TODO(), streamName).Result() - if err != nil { - log.Error("Reading redis streams", "error", err) - return false - } - return groups.Groups > 0 -} - -func waitForStream(client redis.UniversalClient, streamName string) chan struct{} { - var ret chan struct{} - go func() { - if streamExists(client, streamName) { - ret <- struct{}{} - } - time.Sleep(time.Millisecond * 100) - }() - return ret -} - func (s *ValidationServer) Start(ctx_in context.Context) { s.StopWaiter.Start(ctx_in, s) + // Channel that all consumers use to indicate their readiness. + readyStreams := make(chan struct{}, len(s.consumers)) for moduleRoot, c := range s.consumers { c := c + moduleRoot := moduleRoot c.Start(ctx_in) - s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { - req, err := c.Consume(ctx) - if err != nil { - log.Error("Consuming request", "error", err) - return 0 + // Channel for single consumer, once readiness is indicated in this, + // consumer will start consuming iteratively. + ready := make(chan struct{}, 1) + s.StopWaiter.LaunchThread(func(ctx context.Context) { + for { + if pubsub.StreamExists(ctx, c.RedisClient(), c.StreamName()) { + ready <- struct{}{} + readyStreams <- struct{}{} + return + } + time.Sleep(time.Millisecond * 100) } - if req == nil { - // There's nothing in the queue. + }) + s.StopWaiter.LaunchThread(func(ctx context.Context) { + <-ready // Wait until the stream exists and start consuming iteratively. + s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { + req, err := c.Consume(ctx) + if err != nil { + log.Error("Consuming request", "error", err) + return 0 + } + if req == nil { + // There's nothing in the queue. + return time.Second + } + valRun := s.spawner.Launch(req.Value, moduleRoot) + res, err := valRun.Await(ctx) + if err != nil { + log.Error("Error validating", "request value", req.Value, "error", err) + return 0 + } + if err := c.SetResult(ctx, req.ID, res); err != nil { + log.Error("Error setting result for request", "id", req.ID, "result", res, "error", err) + return 0 + } return time.Second - } - valRun := s.spawner.Launch(req.Value, moduleRoot) - res, err := valRun.Await(ctx) - if err != nil { - log.Error("Error validating", "request value", req.Value, "error", err) - return 0 - } - if err := c.SetResult(ctx, req.ID, res); err != nil { - log.Error("Error setting result for request", "id", req.ID, "result", res, "error", err) - return 0 - } - return time.Second + }) }) } + + for { + select { + case <-readyStreams: + log.Trace("At least one stream is ready") + return // Don't block Start if at least one of the stream is ready. + case <-time.After(s.streamTimeout): + log.Error("Waiting for redis streams timed out") + case <-ctx_in.Done(): + log.Error(("Context expired, failed to start")) + return + } + } } type ValidationServerConfig struct { From 14c661636c1040d84b3ba162fe63ce74322dd4ec Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 23 May 2024 21:10:47 +0200 Subject: [PATCH 3/7] Address comments --- pubsub/common.go | 8 ++++---- validator/client/redis/producer.go | 10 +++++++--- validator/valnode/redis/consumer.go | 9 +++++++-- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/pubsub/common.go b/pubsub/common.go index bc0ab1035b..e1dc22c909 100644 --- a/pubsub/common.go +++ b/pubsub/common.go @@ -11,15 +11,15 @@ import ( // does not return an error. func CreateStream(ctx context.Context, streamName string, client redis.UniversalClient) error { _, err := client.XGroupCreateMkStream(ctx, streamName, streamName, "$").Result() - if err == nil || err.Error() == "BUSYGROUP Consumer Group name already exists" { - return nil + if err != nil && !StreamExists(ctx, streamName, client) { + return err } - return err + return nil } // StreamExists returns whether there are any consumer group for specified // redis stream. -func StreamExists(ctx context.Context, client redis.UniversalClient, streamName string) bool { +func StreamExists(ctx context.Context, streamName string, client redis.UniversalClient) bool { groups, err := client.XInfoStream(ctx, streamName).Result() if err != nil { log.Error("Reading redis streams", "error", err) diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index c971664bd3..41ae100954 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -43,7 +43,7 @@ var TestValidationClientConfig = ValidationClientConfig{ Room: 2, RedisURL: "", ProducerConfig: pubsub.TestProducerConfig, - CreateStreams: true, + CreateStreams: false, } func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { @@ -63,6 +63,7 @@ type ValidationClient struct { producerConfig pubsub.ProducerConfig redisClient redis.UniversalClient moduleRoots []common.Hash + createStreams bool } func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) { @@ -79,13 +80,16 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), producerConfig: cfg.ProducerConfig, redisClient: redisClient, + createStreams: cfg.CreateStreams, }, nil } func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { for _, mr := range moduleRoots { - if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil { - return fmt.Errorf("creating redis stream: %w", err) + if c.createStreams { + if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil { + return fmt.Errorf("creating redis stream: %w", err) + } } if _, exists := c.producers[mr]; exists { log.Warn("Producer already existsw for module root", "hash", mr) diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index bc1cd289e7..2fa25ef3c5 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -63,7 +63,7 @@ func (s *ValidationServer) Start(ctx_in context.Context) { ready := make(chan struct{}, 1) s.StopWaiter.LaunchThread(func(ctx context.Context) { for { - if pubsub.StreamExists(ctx, c.RedisClient(), c.StreamName()) { + if pubsub.StreamExists(ctx, c.StreamName(), c.RedisClient()) { ready <- struct{}{} readyStreams <- struct{}{} return @@ -72,7 +72,12 @@ func (s *ValidationServer) Start(ctx_in context.Context) { } }) s.StopWaiter.LaunchThread(func(ctx context.Context) { - <-ready // Wait until the stream exists and start consuming iteratively. + select { + case <-ctx.Done(): + log.Error("Context done", "error", ctx.Err().Error()) + return + case <-ready: // Wait until the stream exists and start consuming iteratively. + } s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { req, err := c.Consume(ctx) if err != nil { From f48c25658b2d82f1adaae84400abae7c87df1483 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 23 May 2024 21:45:25 +0200 Subject: [PATCH 4/7] Address comments --- validator/valnode/redis/consumer.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 2fa25ef3c5..52c8728681 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -68,13 +68,18 @@ func (s *ValidationServer) Start(ctx_in context.Context) { readyStreams <- struct{}{} return } - time.Sleep(time.Millisecond * 100) + select { + case <-ctx.Done(): + log.Info("Context done", "error", ctx.Err().Error()) + return + case <-time.After(time.Millisecond * 100): + } } }) s.StopWaiter.LaunchThread(func(ctx context.Context) { select { case <-ctx.Done(): - log.Error("Context done", "error", ctx.Err().Error()) + log.Info("Context done", "error", ctx.Err().Error()) return case <-ready: // Wait until the stream exists and start consuming iteratively. } @@ -111,7 +116,7 @@ func (s *ValidationServer) Start(ctx_in context.Context) { case <-time.After(s.streamTimeout): log.Error("Waiting for redis streams timed out") case <-ctx_in.Done(): - log.Error(("Context expired, failed to start")) + log.Info(("Context expired, failed to start")) return } } @@ -130,17 +135,20 @@ var DefaultValidationServerConfig = ValidationServerConfig{ RedisURL: "", ConsumerConfig: pubsub.DefaultConsumerConfig, ModuleRoots: []string{}, + StreamTimeout: 10 * time.Minute, } var TestValidationServerConfig = ValidationServerConfig{ RedisURL: "", ConsumerConfig: pubsub.TestConsumerConfig, ModuleRoots: []string{}, + StreamTimeout: time.Minute, } func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) { pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") + f.Duration(prefix+"stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") } func (cfg *ValidationServerConfig) Enabled() bool { From 3c08b790f34637b532dfb34904e9414ca368622e Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 23 May 2024 21:55:56 +0200 Subject: [PATCH 5/7] Fix flag --- validator/valnode/redis/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 52c8728681..26c44fc5e0 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -148,7 +148,7 @@ var TestValidationServerConfig = ValidationServerConfig{ func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) { pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") - f.Duration(prefix+"stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") + f.Duration(prefix+".stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") } func (cfg *ValidationServerConfig) Enabled() bool { From 71ce50f6553b0cbec0624178c4cb3cad26904ba6 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Fri, 24 May 2024 11:11:37 +0200 Subject: [PATCH 6/7] Don't block on consumers start --- validator/valnode/redis/consumer.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 26c44fc5e0..3569e78b5c 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -107,19 +107,20 @@ func (s *ValidationServer) Start(ctx_in context.Context) { }) }) } - - for { - select { - case <-readyStreams: - log.Trace("At least one stream is ready") - return // Don't block Start if at least one of the stream is ready. - case <-time.After(s.streamTimeout): - log.Error("Waiting for redis streams timed out") - case <-ctx_in.Done(): - log.Info(("Context expired, failed to start")) - return + s.StopWaiter.LaunchThread(func(ctx context.Context) { + for { + select { + case <-readyStreams: + log.Trace("At least one stream is ready") + return // Don't block Start if at least one of the stream is ready. + case <-time.After(s.streamTimeout): + log.Error("Waiting for redis streams timed out") + case <-ctx.Done(): + log.Info(("Context expired, failed to start")) + return + } } - } + }) } type ValidationServerConfig struct { From 36da838f734209419719eec7e8500bc988794089 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Fri, 24 May 2024 17:57:49 +0200 Subject: [PATCH 7/7] Fix test --- pubsub/common.go | 4 ++-- system_tests/block_validator_test.go | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pubsub/common.go b/pubsub/common.go index e1dc22c909..9f05304e46 100644 --- a/pubsub/common.go +++ b/pubsub/common.go @@ -20,10 +20,10 @@ func CreateStream(ctx context.Context, streamName string, client redis.Universal // StreamExists returns whether there are any consumer group for specified // redis stream. func StreamExists(ctx context.Context, streamName string, client redis.UniversalClient) bool { - groups, err := client.XInfoStream(ctx, streamName).Result() + got, err := client.Do(ctx, "XINFO", "STREAM", streamName).Result() if err != nil { log.Error("Reading redis streams", "error", err) return false } - return groups.Groups > 0 + return got != nil } diff --git a/system_tests/block_validator_test.go b/system_tests/block_validator_test.go index debd6d4c7c..54046edf15 100644 --- a/system_tests/block_validator_test.go +++ b/system_tests/block_validator_test.go @@ -74,6 +74,8 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops redisURL = redisutil.CreateTestRedis(ctx, t) validatorConfig.BlockValidator.RedisValidationClientConfig = redis.TestValidationClientConfig validatorConfig.BlockValidator.RedisValidationClientConfig.RedisURL = redisURL + } else { + validatorConfig.BlockValidator.RedisValidationClientConfig = redis.ValidationClientConfig{} } AddDefaultValNode(t, ctx, validatorConfig, !arbitrator, redisURL)