From 7afc0db030b0c94fd5eb30e0b6bb079e016d6b9f Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Fri, 12 Jul 2024 15:30:02 -0600 Subject: [PATCH] validation client and server: allow prefix off by default, this will allow multiple streams to use the same redis --- system_tests/common_test.go | 4 ++-- validator/client/redis/producer.go | 27 ++++++++++++++------------- validator/server_api/json.go | 4 ++-- validator/valnode/redis/consumer.go | 6 +++++- 4 files changed, 23 insertions(+), 18 deletions(-) diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 16d6b2f131..1a11abf89b 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -624,12 +624,12 @@ func AddDefaultValNode(t *testing.T, ctx context.Context, nodeConfig *arbnode.Co conf.Wasm.RootPath = wasmRootDir // Enable redis streams when URL is specified if redisURL != "" { - conf.Arbitrator.RedisValidationServerConfig = rediscons.DefaultValidationServerConfig + conf.Arbitrator.RedisValidationServerConfig = rediscons.TestValidationServerConfig redisClient, err := redisutil.RedisClientFromURL(redisURL) if err != nil { t.Fatalf("Error creating redis coordinator: %v", err) } - redisStream := server_api.RedisStreamForRoot(currentRootModule(t)) + redisStream := server_api.RedisStreamForRoot(rediscons.TestValidationServerConfig.StreamPrefix, currentRootModule(t)) createRedisGroup(ctx, t, redisStream, redisClient) conf.Arbitrator.RedisValidationServerConfig.RedisURL = redisURL t.Cleanup(func() { destroyRedisGroup(ctx, t, redisStream, redisClient) }) diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index 4aa4031350..fad64489b0 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -20,6 +20,7 @@ import ( type ValidationClientConfig struct { Name string `koanf:"name"` + StreamPrefix string `koanf:"stream-prefix"` Room int32 `koanf:"room"` RedisURL string `koanf:"redis-url"` ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"` @@ -42,6 +43,7 @@ var TestValidationClientConfig = ValidationClientConfig{ Name: "test redis validation client", Room: 2, RedisURL: "", + StreamPrefix: "test-", ProducerConfig: pubsub.TestProducerConfig, CreateStreams: false, } @@ -50,6 +52,7 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".name", DefaultValidationClientConfig.Name, "validation client name") f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room") f.String(prefix+".redis-url", DefaultValidationClientConfig.RedisURL, "redis url") + f.String(prefix+".stream-prefix", DefaultValidationClientConfig.StreamPrefix, "prefix for stream name") pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f) f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist") } @@ -57,14 +60,14 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { // ValidationClient implements validation client through redis streams. type ValidationClient struct { stopwaiter.StopWaiter - name string - room int32 + config *ValidationClientConfig + name string + room int32 // producers stores moduleRoot to producer mapping. producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState] producerConfig pubsub.ProducerConfig redisClient redis.UniversalClient moduleRoots []common.Hash - createStreams bool } func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) { @@ -76,19 +79,17 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) return nil, err } return &ValidationClient{ - name: cfg.Name, - room: cfg.Room, - producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), - producerConfig: cfg.ProducerConfig, - redisClient: redisClient, - createStreams: cfg.CreateStreams, + config: cfg, + room: cfg.Room, + producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), + redisClient: redisClient, }, nil } func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { for _, mr := range moduleRoots { - if c.createStreams { - if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil { + if c.config.CreateStreams { + if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(c.config.StreamPrefix, mr), c.redisClient); err != nil { return fmt.Errorf("creating redis stream: %w", err) } } @@ -97,7 +98,7 @@ func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common. continue } p, err := pubsub.NewProducer[*validator.ValidationInput, validator.GoGlobalState]( - c.redisClient, server_api.RedisStreamForRoot(mr), &c.producerConfig) + c.redisClient, server_api.RedisStreamForRoot(c.config.StreamPrefix, mr), &c.producerConfig) if err != nil { log.Warn("failed init redis for %v: %w", mr, err) continue @@ -152,5 +153,5 @@ func (c *ValidationClient) Name() string { } func (c *ValidationClient) Room() int { - return int(c.room) + return int(atomic.LoadInt32(&c.room)) } diff --git a/validator/server_api/json.go b/validator/server_api/json.go index 3dd817d5ae..dd646e1aa1 100644 --- a/validator/server_api/json.go +++ b/validator/server_api/json.go @@ -45,8 +45,8 @@ func MachineStepResultFromJson(resultJson *MachineStepResultJson) (*validator.Ma }, nil } -func RedisStreamForRoot(moduleRoot common.Hash) string { - return fmt.Sprintf("stream:%s", moduleRoot.Hex()) +func RedisStreamForRoot(prefix string, moduleRoot common.Hash) string { + return fmt.Sprintf("%sstream:%s", prefix, moduleRoot.Hex()) } type Request struct { diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 84f597c095..fb7db1e870 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -37,7 +37,7 @@ func NewValidationServer(cfg *ValidationServerConfig, spawner validator.Validati consumers := make(map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState]) for _, hash := range cfg.ModuleRoots { mr := common.HexToHash(hash) - c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, server_api.RedisStreamForRoot(mr), &cfg.ConsumerConfig) + c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, server_api.RedisStreamForRoot(cfg.StreamPrefix, mr), &cfg.ConsumerConfig) if err != nil { return nil, fmt.Errorf("creating consumer for validation: %w", err) } @@ -130,10 +130,12 @@ type ValidationServerConfig struct { ModuleRoots []string `koanf:"module-roots"` // Timeout on polling for existence of each redis stream. StreamTimeout time.Duration `koanf:"stream-timeout"` + StreamPrefix string `koanf:"stream-prefix"` } var DefaultValidationServerConfig = ValidationServerConfig{ RedisURL: "", + StreamPrefix: "", ConsumerConfig: pubsub.DefaultConsumerConfig, ModuleRoots: []string{}, StreamTimeout: 10 * time.Minute, @@ -141,6 +143,7 @@ var DefaultValidationServerConfig = ValidationServerConfig{ var TestValidationServerConfig = ValidationServerConfig{ RedisURL: "", + StreamPrefix: "test-", ConsumerConfig: pubsub.TestConsumerConfig, ModuleRoots: []string{}, StreamTimeout: time.Minute, @@ -150,6 +153,7 @@ func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) { pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") f.String(prefix+".redis-url", DefaultValidationServerConfig.RedisURL, "url of redis server") + f.String(prefix+".stream-prefix", DefaultValidationServerConfig.StreamPrefix, "prefix for stream name") f.Duration(prefix+".stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") }