Skip to content

Commit

Permalink
validation client and server: allow prefix
Browse files Browse the repository at this point in the history
off by default, this will allow multiple streams to use the same redis
  • Loading branch information
tsahee committed Jul 12, 2024
1 parent 3b36d2e commit 7afc0db
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 18 deletions.
4 changes: 2 additions & 2 deletions system_tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,12 +624,12 @@ func AddDefaultValNode(t *testing.T, ctx context.Context, nodeConfig *arbnode.Co
conf.Wasm.RootPath = wasmRootDir
// Enable redis streams when URL is specified
if redisURL != "" {
conf.Arbitrator.RedisValidationServerConfig = rediscons.DefaultValidationServerConfig
conf.Arbitrator.RedisValidationServerConfig = rediscons.TestValidationServerConfig
redisClient, err := redisutil.RedisClientFromURL(redisURL)
if err != nil {
t.Fatalf("Error creating redis coordinator: %v", err)
}
redisStream := server_api.RedisStreamForRoot(currentRootModule(t))
redisStream := server_api.RedisStreamForRoot(rediscons.TestValidationServerConfig.StreamPrefix, currentRootModule(t))
createRedisGroup(ctx, t, redisStream, redisClient)
conf.Arbitrator.RedisValidationServerConfig.RedisURL = redisURL
t.Cleanup(func() { destroyRedisGroup(ctx, t, redisStream, redisClient) })
Expand Down
27 changes: 14 additions & 13 deletions validator/client/redis/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

type ValidationClientConfig struct {
Name string `koanf:"name"`
StreamPrefix string `koanf:"stream-prefix"`
Room int32 `koanf:"room"`
RedisURL string `koanf:"redis-url"`
ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"`
Expand All @@ -42,6 +43,7 @@ var TestValidationClientConfig = ValidationClientConfig{
Name: "test redis validation client",
Room: 2,
RedisURL: "",
StreamPrefix: "test-",
ProducerConfig: pubsub.TestProducerConfig,
CreateStreams: false,
}
Expand All @@ -50,21 +52,22 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".name", DefaultValidationClientConfig.Name, "validation client name")
f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room")
f.String(prefix+".redis-url", DefaultValidationClientConfig.RedisURL, "redis url")
f.String(prefix+".stream-prefix", DefaultValidationClientConfig.StreamPrefix, "prefix for stream name")
pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f)
f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist")
}

// ValidationClient implements validation client through redis streams.
type ValidationClient struct {
stopwaiter.StopWaiter
name string
room int32
config *ValidationClientConfig
name string
room int32
// producers stores moduleRoot to producer mapping.
producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]
producerConfig pubsub.ProducerConfig
redisClient redis.UniversalClient
moduleRoots []common.Hash
createStreams bool
}

func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) {
Expand All @@ -76,19 +79,17 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error)
return nil, err
}
return &ValidationClient{
name: cfg.Name,
room: cfg.Room,
producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]),
producerConfig: cfg.ProducerConfig,
redisClient: redisClient,
createStreams: cfg.CreateStreams,
config: cfg,
room: cfg.Room,
producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]),
redisClient: redisClient,
}, nil
}

func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error {
for _, mr := range moduleRoots {
if c.createStreams {
if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil {
if c.config.CreateStreams {
if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(c.config.StreamPrefix, mr), c.redisClient); err != nil {
return fmt.Errorf("creating redis stream: %w", err)
}
}
Expand All @@ -97,7 +98,7 @@ func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.
continue
}
p, err := pubsub.NewProducer[*validator.ValidationInput, validator.GoGlobalState](
c.redisClient, server_api.RedisStreamForRoot(mr), &c.producerConfig)
c.redisClient, server_api.RedisStreamForRoot(c.config.StreamPrefix, mr), &c.producerConfig)
if err != nil {
log.Warn("failed init redis for %v: %w", mr, err)
continue
Expand Down Expand Up @@ -152,5 +153,5 @@ func (c *ValidationClient) Name() string {
}

func (c *ValidationClient) Room() int {
return int(c.room)
return int(atomic.LoadInt32(&c.room))
}
4 changes: 2 additions & 2 deletions validator/server_api/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func MachineStepResultFromJson(resultJson *MachineStepResultJson) (*validator.Ma
}, nil
}

func RedisStreamForRoot(moduleRoot common.Hash) string {
return fmt.Sprintf("stream:%s", moduleRoot.Hex())
func RedisStreamForRoot(prefix string, moduleRoot common.Hash) string {
return fmt.Sprintf("%sstream:%s", prefix, moduleRoot.Hex())
}

type Request struct {
Expand Down
6 changes: 5 additions & 1 deletion validator/valnode/redis/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewValidationServer(cfg *ValidationServerConfig, spawner validator.Validati
consumers := make(map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState])
for _, hash := range cfg.ModuleRoots {
mr := common.HexToHash(hash)
c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, server_api.RedisStreamForRoot(mr), &cfg.ConsumerConfig)
c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, server_api.RedisStreamForRoot(cfg.StreamPrefix, mr), &cfg.ConsumerConfig)
if err != nil {
return nil, fmt.Errorf("creating consumer for validation: %w", err)
}
Expand Down Expand Up @@ -130,17 +130,20 @@ type ValidationServerConfig struct {
ModuleRoots []string `koanf:"module-roots"`
// Timeout on polling for existence of each redis stream.
StreamTimeout time.Duration `koanf:"stream-timeout"`
StreamPrefix string `koanf:"stream-prefix"`
}

var DefaultValidationServerConfig = ValidationServerConfig{
RedisURL: "",
StreamPrefix: "",
ConsumerConfig: pubsub.DefaultConsumerConfig,
ModuleRoots: []string{},
StreamTimeout: 10 * time.Minute,
}

var TestValidationServerConfig = ValidationServerConfig{
RedisURL: "",
StreamPrefix: "test-",
ConsumerConfig: pubsub.TestConsumerConfig,
ModuleRoots: []string{},
StreamTimeout: time.Minute,
Expand All @@ -150,6 +153,7 @@ func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) {
pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f)
f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes")
f.String(prefix+".redis-url", DefaultValidationServerConfig.RedisURL, "url of redis server")
f.String(prefix+".stream-prefix", DefaultValidationServerConfig.StreamPrefix, "prefix for stream name")
f.Duration(prefix+".stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams")
}

Expand Down

0 comments on commit 7afc0db

Please sign in to comment.