Skip to content

Commit

Permalink
Merge pull request #2303 from OffchainLabs/redis-stream-creation
Browse files Browse the repository at this point in the history
Create streams in redis client, poll on it in redis-server
  • Loading branch information
tsahee authored May 24, 2024
2 parents 917f828 + 41d0e59 commit 7cda2b3
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 25 deletions.
29 changes: 29 additions & 0 deletions pubsub/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pubsub

import (
"context"

"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
)

// CreateStream tries to create stream with given name, if it already exists
// does not return an error.
func CreateStream(ctx context.Context, streamName string, client redis.UniversalClient) error {
_, err := client.XGroupCreateMkStream(ctx, streamName, streamName, "$").Result()
if err != nil && !StreamExists(ctx, streamName, client) {
return err
}
return nil
}

// StreamExists returns whether there are any consumer group for specified
// redis stream.
func StreamExists(ctx context.Context, streamName string, client redis.UniversalClient) bool {
got, err := client.Do(ctx, "XINFO", "STREAM", streamName).Result()
if err != nil {
log.Error("Reading redis streams", "error", err)
return false
}
return got != nil
}
8 changes: 8 additions & 0 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ func heartBeatKey(id string) string {
return fmt.Sprintf("consumer:%s:heartbeat", id)
}

func (c *Consumer[Request, Response]) RedisClient() redis.UniversalClient {
return c.client
}

func (c *Consumer[Request, Response]) StreamName() string {
return c.redisStream
}

func (c *Consumer[Request, Response]) heartBeatKey() string {
return heartBeatKey(c.id)
}
Expand Down
2 changes: 1 addition & 1 deletion staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
}
// First spawner is always RedisValidationClient if RedisStreams are enabled.
if v.redisValidator != nil {
err := v.redisValidator.Initialize(moduleRoots)
err := v.redisValidator.Initialize(ctx, moduleRoots)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions system_tests/block_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func testBlockValidatorSimple(t *testing.T, dasModeString string, workloadLoops
redisURL = redisutil.CreateTestRedis(ctx, t)
validatorConfig.BlockValidator.RedisValidationClientConfig = redis.TestValidationClientConfig
validatorConfig.BlockValidator.RedisValidationClientConfig.RedisURL = redisURL
} else {
validatorConfig.BlockValidator.RedisValidationClientConfig = redis.ValidationClientConfig{}
}

AddDefaultValNode(t, ctx, validatorConfig, !arbitrator, redisURL)
Expand Down
13 changes: 12 additions & 1 deletion validator/client/redis/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type ValidationClientConfig struct {
Room int32 `koanf:"room"`
RedisURL string `koanf:"redis-url"`
ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"`
CreateStreams bool `koanf:"create-streams"`
}

func (c ValidationClientConfig) Enabled() bool {
Expand All @@ -34,19 +35,22 @@ var DefaultValidationClientConfig = ValidationClientConfig{
Room: 2,
RedisURL: "",
ProducerConfig: pubsub.DefaultProducerConfig,
CreateStreams: true,
}

var TestValidationClientConfig = ValidationClientConfig{
Name: "test redis validation client",
Room: 2,
RedisURL: "",
ProducerConfig: pubsub.TestProducerConfig,
CreateStreams: false,
}

func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".name", DefaultValidationClientConfig.Name, "validation client name")
f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room")
pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f)
f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist")
}

// ValidationClient implements validation client through redis streams.
Expand All @@ -59,6 +63,7 @@ type ValidationClient struct {
producerConfig pubsub.ProducerConfig
redisClient redis.UniversalClient
moduleRoots []common.Hash
createStreams bool
}

func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) {
Expand All @@ -75,11 +80,17 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error)
producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]),
producerConfig: cfg.ProducerConfig,
redisClient: redisClient,
createStreams: cfg.CreateStreams,
}, nil
}

func (c *ValidationClient) Initialize(moduleRoots []common.Hash) error {
func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error {
for _, mr := range moduleRoots {
if c.createStreams {
if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil {
return fmt.Errorf("creating redis stream: %w", err)
}
}
if _, exists := c.producers[mr]; exists {
log.Warn("Producer already existsw for module root", "hash", mr)
continue
Expand Down
2 changes: 1 addition & 1 deletion validator/server_common/machine_locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewMachineLocator(rootPath string) (*MachineLocator, error) {
for _, dir := range dirs {
fInfo, err := os.Stat(dir)
if err != nil {
log.Warn("Getting file info", "error", err)
log.Warn("Getting file info", "dir", dir, "error", err)
continue
}
if !fInfo.IsDir() {
Expand Down
94 changes: 72 additions & 22 deletions validator/valnode/redis/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type ValidationServer struct {
spawner validator.ValidationSpawner

// consumers stores moduleRoot to consumer mapping.
consumers map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState]
consumers map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState]
streamTimeout time.Duration
}

func NewValidationServer(cfg *ValidationServerConfig, spawner validator.ValidationSpawner) (*ValidationServer, error) {
Expand All @@ -43,63 +44,112 @@ func NewValidationServer(cfg *ValidationServerConfig, spawner validator.Validati
consumers[mr] = c
}
return &ValidationServer{
consumers: consumers,
spawner: spawner,
consumers: consumers,
spawner: spawner,
streamTimeout: cfg.StreamTimeout,
}, nil
}

func (s *ValidationServer) Start(ctx_in context.Context) {
s.StopWaiter.Start(ctx_in, s)
// Channel that all consumers use to indicate their readiness.
readyStreams := make(chan struct{}, len(s.consumers))
for moduleRoot, c := range s.consumers {
c := c
moduleRoot := moduleRoot
c.Start(ctx_in)
s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration {
req, err := c.Consume(ctx)
if err != nil {
log.Error("Consuming request", "error", err)
return 0
// Channel for single consumer, once readiness is indicated in this,
// consumer will start consuming iteratively.
ready := make(chan struct{}, 1)
s.StopWaiter.LaunchThread(func(ctx context.Context) {
for {
if pubsub.StreamExists(ctx, c.StreamName(), c.RedisClient()) {
ready <- struct{}{}
readyStreams <- struct{}{}
return
}
select {
case <-ctx.Done():
log.Info("Context done", "error", ctx.Err().Error())
return
case <-time.After(time.Millisecond * 100):
}
}
if req == nil {
// There's nothing in the queue.
return time.Second
}
valRun := s.spawner.Launch(req.Value, moduleRoot)
res, err := valRun.Await(ctx)
if err != nil {
log.Error("Error validating", "request value", req.Value, "error", err)
return 0
}
if err := c.SetResult(ctx, req.ID, res); err != nil {
log.Error("Error setting result for request", "id", req.ID, "result", res, "error", err)
return 0
})
s.StopWaiter.LaunchThread(func(ctx context.Context) {
select {
case <-ctx.Done():
log.Info("Context done", "error", ctx.Err().Error())
return
case <-ready: // Wait until the stream exists and start consuming iteratively.
}
return time.Second
s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration {
req, err := c.Consume(ctx)
if err != nil {
log.Error("Consuming request", "error", err)
return 0
}
if req == nil {
// There's nothing in the queue.
return time.Second
}
valRun := s.spawner.Launch(req.Value, moduleRoot)
res, err := valRun.Await(ctx)
if err != nil {
log.Error("Error validating", "request value", req.Value, "error", err)
return 0
}
if err := c.SetResult(ctx, req.ID, res); err != nil {
log.Error("Error setting result for request", "id", req.ID, "result", res, "error", err)
return 0
}
return time.Second
})
})
}
s.StopWaiter.LaunchThread(func(ctx context.Context) {
for {
select {
case <-readyStreams:
log.Trace("At least one stream is ready")
return // Don't block Start if at least one of the stream is ready.
case <-time.After(s.streamTimeout):
log.Error("Waiting for redis streams timed out")
case <-ctx.Done():
log.Info(("Context expired, failed to start"))
return
}
}
})
}

type ValidationServerConfig struct {
RedisURL string `koanf:"redis-url"`
ConsumerConfig pubsub.ConsumerConfig `koanf:"consumer-config"`
// Supported wasm module roots.
ModuleRoots []string `koanf:"module-roots"`
// Timeout on polling for existence of each redis stream.
StreamTimeout time.Duration `koanf:"stream-timeout"`
}

var DefaultValidationServerConfig = ValidationServerConfig{
RedisURL: "",
ConsumerConfig: pubsub.DefaultConsumerConfig,
ModuleRoots: []string{},
StreamTimeout: 10 * time.Minute,
}

var TestValidationServerConfig = ValidationServerConfig{
RedisURL: "",
ConsumerConfig: pubsub.TestConsumerConfig,
ModuleRoots: []string{},
StreamTimeout: time.Minute,
}

func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) {
pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f)
f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes")
f.Duration(prefix+".stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams")
}

func (cfg *ValidationServerConfig) Enabled() bool {
Expand Down

0 comments on commit 7cda2b3

Please sign in to comment.