Skip to content

Commit

Permalink
Create streams in redis client, poll on it in redis-server
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed May 15, 2024
1 parent 28033f9 commit 761e8e2
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 3 deletions.
17 changes: 17 additions & 0 deletions pubsub/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package pubsub

import (
"context"

"github.com/go-redis/redis/v8"
)

// CreateStream tries to create stream with given name, if it already exists
// does not return an error.
func CreateStream(ctx context.Context, streamName string, client redis.UniversalClient) error {
_, err := client.XGroupCreateMkStream(ctx, streamName, streamName, "$").Result()
if err == nil || err.Error() == "BUSYGROUP Consumer Group name already exists" {
return nil
}
return err
}
2 changes: 1 addition & 1 deletion staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
}
// First spawner is always RedisValidationClient if RedisStreams are enabled.
if v.redisValidator != nil {
err := v.redisValidator.Initialize(moduleRoots)
err := v.redisValidator.Initialize(ctx, moduleRoots)
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion validator/client/redis/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type ValidationClientConfig struct {
Room int32 `koanf:"room"`
RedisURL string `koanf:"redis-url"`
ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"`
CreateStreams bool `koanf:"create-streams"`
}

func (c ValidationClientConfig) Enabled() bool {
Expand All @@ -34,19 +35,22 @@ var DefaultValidationClientConfig = ValidationClientConfig{
Room: 2,
RedisURL: "",
ProducerConfig: pubsub.DefaultProducerConfig,
CreateStreams: true,
}

var TestValidationClientConfig = ValidationClientConfig{
Name: "test redis validation client",
Room: 2,
RedisURL: "",
ProducerConfig: pubsub.TestProducerConfig,
CreateStreams: true,
}

func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".name", DefaultValidationClientConfig.Name, "validation client name")
f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room")
pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f)
f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist")
}

// ValidationClient implements validation client through redis streams.
Expand Down Expand Up @@ -78,8 +82,11 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error)
}, nil
}

func (c *ValidationClient) Initialize(moduleRoots []common.Hash) error {
func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error {
for _, mr := range moduleRoots {
if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil {
return fmt.Errorf("creating redis stream: %w", err)
}
if _, exists := c.producers[mr]; exists {
log.Warn("Producer already existsw for module root", "hash", mr)
continue
Expand Down
2 changes: 1 addition & 1 deletion validator/server_common/machine_locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewMachineLocator(rootPath string) (*MachineLocator, error) {
for _, dir := range dirs {
fInfo, err := os.Stat(dir)
if err != nil {
log.Warn("Getting file info", "error", err)
log.Warn("Getting file info", "dir", dir, "error", err)
continue
}
if !fInfo.IsDir() {
Expand Down
49 changes: 49 additions & 0 deletions validator/valnode/redis/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package redis
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/offchainlabs/nitro/pubsub"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
Expand Down Expand Up @@ -42,12 +45,56 @@ func NewValidationServer(cfg *ValidationServerConfig, spawner validator.Validati
}
consumers[mr] = c
}
var (
wg sync.WaitGroup
initialized atomic.Bool
)
initialized.Store(true)
for i := 0; i < len(cfg.ModuleRoots); i++ {
mr := cfg.ModuleRoots[i]
wg.Add(1)
go func() {
defer wg.Done()
done := waitForStream(redisClient, mr)
select {
case <-time.After(cfg.StreamTimeout):
initialized.Store(false)
return
case <-done:
return
}
}()
}
wg.Wait()
if !initialized.Load() {
return nil, fmt.Errorf("waiting for streams to be created: timed out")
}
return &ValidationServer{
consumers: consumers,
spawner: spawner,
}, nil
}

func streamExists(client redis.UniversalClient, streamName string) bool {
groups, err := client.XInfoStream(context.TODO(), streamName).Result()
if err != nil {
log.Error("Reading redis streams", "error", err)
return false
}
return groups.Groups > 0
}

func waitForStream(client redis.UniversalClient, streamName string) chan struct{} {
var ret chan struct{}
go func() {
if streamExists(client, streamName) {
ret <- struct{}{}
}
time.Sleep(time.Millisecond * 100)
}()
return ret
}

func (s *ValidationServer) Start(ctx_in context.Context) {
s.StopWaiter.Start(ctx_in, s)
for moduleRoot, c := range s.consumers {
Expand Down Expand Up @@ -83,6 +130,8 @@ type ValidationServerConfig struct {
ConsumerConfig pubsub.ConsumerConfig `koanf:"consumer-config"`
// Supported wasm module roots.
ModuleRoots []string `koanf:"module-roots"`
// Timeout on polling for existence of each redis stream.
StreamTimeout time.Duration `koanf:"stream-timeout"`
}

var DefaultValidationServerConfig = ValidationServerConfig{
Expand Down

0 comments on commit 761e8e2

Please sign in to comment.