Skip to content

Commit

Permalink
Move redisURL and redisStream out from producer and consumer, pass it…
Browse files Browse the repository at this point in the history
… to the constructor instead
  • Loading branch information
anodar committed Apr 19, 2024
1 parent 4674992 commit 51d4666
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 108 deletions.
53 changes: 15 additions & 38 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/spf13/pflag"
)
Expand All @@ -21,78 +20,56 @@ type ConsumerConfig struct {
// Duration after which consumer is considered to be dead if heartbeat
// is not updated.
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Redis url for Redis streams and locks.
RedisURL string `koanf:"redis-url"`
// Redis stream name.
RedisStream string `koanf:"redis-stream"`
// Redis consumer group name.
RedisGroup string `koanf:"redis-group"`
}

func (c ConsumerConfig) Clone() ConsumerConfig {
return ConsumerConfig{
ResponseEntryTimeout: c.ResponseEntryTimeout,
KeepAliveTimeout: c.KeepAliveTimeout,
RedisURL: c.RedisURL,
RedisStream: c.RedisStream,
RedisGroup: c.RedisGroup,
}
}

var DefaultConsumerConfig = ConsumerConfig{
ResponseEntryTimeout: time.Hour,
KeepAliveTimeout: 5 * time.Minute,
RedisStream: "",
RedisGroup: "",
}

var TestConsumerConfig = ConsumerConfig{
RedisStream: "",
RedisGroup: "",
ResponseEntryTimeout: time.Minute,
KeepAliveTimeout: 30 * time.Millisecond,
}

func ConsumerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".response-entry-timeout", DefaultConsumerConfig.ResponseEntryTimeout, "timeout for response entry")
f.Duration(prefix+".keepalive-timeout", DefaultConsumerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.String(prefix+".redis-url", DefaultConsumerConfig.RedisURL, "redis url for redis stream")
f.String(prefix+".redis-stream", DefaultConsumerConfig.RedisStream, "redis stream name to read from")
f.String(prefix+".redis-group", DefaultConsumerConfig.RedisGroup, "redis stream consumer group name")
}

// Consumer implements a consumer for redis stream provides heartbeat to
// indicate it is alive.
type Consumer[Request any, Response any] struct {
stopwaiter.StopWaiter
id string
client redis.UniversalClient
cfg *ConsumerConfig
id string
client redis.UniversalClient
redisStream string
redisGroup string
cfg *ConsumerConfig
}

type Message[Request any] struct {
ID string
Value Request
}

func NewConsumer[Request any, Response any](cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
if cfg.RedisStream == "" {
func NewConsumer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
if streamName == "" {
return nil, fmt.Errorf("redis stream name cannot be empty")
}
if cfg.RedisGroup == "" {
return nil, fmt.Errorf("redis group name cannot be emtpy")
}
c, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
}
consumer := &Consumer[Request, Response]{
id: uuid.NewString(),
client: c,
cfg: cfg,
id: uuid.NewString(),
client: client,
redisStream: streamName,
redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group.
cfg: cfg,
}
return consumer, nil
}
Expand Down Expand Up @@ -135,11 +112,11 @@ func (c *Consumer[Request, Response]) heartBeat(ctx context.Context) {
// unresponsive consumer, if not then reads from the stream.
func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Request], error) {
res, err := c.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.cfg.RedisGroup,
Group: c.redisGroup,
Consumer: c.id,
// Receive only messages that were never delivered to any other consumer,
// that is, only new messages.
Streams: []string{c.cfg.RedisStream, ">"},
Streams: []string{c.redisStream, ">"},
Count: 1,
Block: time.Millisecond, // 0 seems to block the read instead of immediately returning
}).Result()
Expand Down Expand Up @@ -180,7 +157,7 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s
if err != nil || !acquired {
return fmt.Errorf("setting result for message: %v, error: %w", messageID, err)
}
if _, err := c.client.XAck(ctx, c.cfg.RedisStream, c.cfg.RedisGroup, messageID).Result(); err != nil {
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
return nil
Expand Down
65 changes: 23 additions & 42 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/spf13/pflag"
)
Expand All @@ -32,9 +31,11 @@ const (

type Producer[Request any, Response any] struct {
stopwaiter.StopWaiter
id string
client redis.UniversalClient
cfg *ProducerConfig
id string
client redis.UniversalClient
redisStream string
redisGroup string
cfg *ProducerConfig

promisesLock sync.RWMutex
promises map[string]*containers.Promise[Response]
Expand All @@ -49,10 +50,7 @@ type ProducerConfig struct {
// When enabled, messages that are sent to consumers that later die before
// processing them, will be re-inserted into the stream to be proceesed by
// another consumer
EnableReproduce bool `koanf:"enable-reproduce"`
RedisURL string `koanf:"redis-url"`
// Redis stream name.
RedisStream string `koanf:"redis-stream"`
EnableReproduce bool `koanf:"enable-reproduce"`
// Interval duration in which producer checks for pending messages delivered
// to the consumers that are currently inactive.
CheckPendingInterval time.Duration `koanf:"check-pending-interval"`
Expand All @@ -61,68 +59,51 @@ type ProducerConfig struct {
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Interval duration for checking the result set by consumers.
CheckResultInterval time.Duration `koanf:"check-result-interval"`
// Redis consumer group name.
RedisGroup string `koanf:"redis-group"`
}

func (c ProducerConfig) Clone() ProducerConfig {
return ProducerConfig{
EnableReproduce: c.EnableReproduce,
RedisURL: c.RedisURL,
RedisStream: c.RedisStream,
CheckPendingInterval: c.CheckPendingInterval,
KeepAliveTimeout: c.KeepAliveTimeout,
CheckResultInterval: c.CheckResultInterval,
RedisGroup: c.RedisGroup,
}
}

var DefaultProducerConfig = ProducerConfig{
EnableReproduce: true,
RedisStream: "",
RedisGroup: "",
CheckPendingInterval: time.Second,
KeepAliveTimeout: 5 * time.Minute,
CheckResultInterval: 5 * time.Second,
}

var TestProducerConfig = ProducerConfig{
EnableReproduce: true,
RedisStream: "",
RedisGroup: "",
CheckPendingInterval: 10 * time.Millisecond,
KeepAliveTimeout: 100 * time.Millisecond,
CheckResultInterval: 5 * time.Millisecond,
}

func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable-reproduce", DefaultProducerConfig.EnableReproduce, "when enabled, messages with dead consumer will be re-inserted into the stream")
f.String(prefix+".redis-url", DefaultProducerConfig.RedisURL, "redis url for redis stream")
f.Duration(prefix+".check-pending-interval", DefaultProducerConfig.CheckPendingInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".keepalive-timeout", DefaultProducerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.String(prefix+".redis-stream", DefaultProducerConfig.RedisStream, "redis stream name to read from")
f.String(prefix+".redis-group", DefaultProducerConfig.RedisGroup, "redis stream consumer group name")
}

func NewProducer[Request any, Response any](cfg *ProducerConfig) (*Producer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
func NewProducer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ProducerConfig) (*Producer[Request, Response], error) {
if client == nil {
return nil, fmt.Errorf("redis client cannot be nil")
}
if cfg.RedisStream == "" {
return nil, fmt.Errorf("redis stream cannot be emtpy")
}
if cfg.RedisGroup == "" {
return nil, fmt.Errorf("redis group cannot be empty")
}
c, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
if streamName == "" {
return nil, fmt.Errorf("stream name cannot be empty")
}
return &Producer[Request, Response]{
id: uuid.NewString(),
client: c,
cfg: cfg,
promises: make(map[string]*containers.Promise[Response]),
id: uuid.NewString(),
client: client,
redisStream: streamName,
redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group.
cfg: cfg,
promises: make(map[string]*containers.Promise[Response]),
}, nil
}

Expand Down Expand Up @@ -154,7 +135,7 @@ func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) tim
}
acked := make(map[string]Request)
for _, msg := range msgs {
if _, err := p.client.XAck(ctx, p.cfg.RedisStream, p.cfg.RedisGroup, msg.ID).Result(); err != nil {
if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil {
log.Error("ACKing message", "error", err)
continue
}
Expand Down Expand Up @@ -212,7 +193,7 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
return nil, fmt.Errorf("marshaling value: %w", err)
}
id, err := p.client.XAdd(ctx, &redis.XAddArgs{
Stream: p.cfg.RedisStream,
Stream: p.redisStream,
Values: map[string]any{messageKey: val},
}).Result()
if err != nil {
Expand Down Expand Up @@ -260,8 +241,8 @@ func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool {

func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) {
pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: p.cfg.RedisStream,
Group: p.cfg.RedisGroup,
Stream: p.redisStream,
Group: p.redisGroup,
Start: "-",
End: "+",
Count: 100,
Expand Down Expand Up @@ -297,8 +278,8 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
}
log.Info("Attempting to claim", "messages", ids)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Stream: p.cfg.RedisStream,
Group: p.cfg.RedisGroup,
Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: ids,
Expand Down
28 changes: 14 additions & 14 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@ type testResponse struct {
Response string
}

func createGroup(ctx context.Context, t *testing.T, streamName, groupName string, client redis.UniversalClient) {
func createGroup(ctx context.Context, t *testing.T, streamName string, client redis.UniversalClient) {
t.Helper()
if _, err := client.XGroupCreateMkStream(ctx, streamName, groupName, "$").Result(); err != nil {
// Stream name and group name are the same.
if _, err := client.XGroupCreateMkStream(ctx, streamName, streamName, "$").Result(); err != nil {
t.Fatalf("Error creating stream group: %v", err)
}
}

func destroyGroup(ctx context.Context, t *testing.T, streamName, groupName string, client redis.UniversalClient) {
func destroyGroup(ctx context.Context, t *testing.T, streamName string, client redis.UniversalClient) {
t.Helper()
if _, err := client.XGroupDestroy(ctx, streamName, groupName).Result(); err != nil {
if _, err := client.XGroupDestroy(ctx, streamName, streamName).Result(); err != nil {
log.Debug("Error destroying a stream group", "error", err)
}
}
Expand Down Expand Up @@ -70,33 +71,32 @@ func consumerCfg() *ConsumerConfig {

func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) (*Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) {
t.Helper()
redisURL := redisutil.CreateTestRedis(ctx, t)
redisClient, err := redisutil.RedisClientFromURL(redisutil.CreateTestRedis(ctx, t))
if err != nil {
t.Fatalf("RedisClientFromURL() unexpected error: %v", err)
}
prodCfg, consCfg := producerCfg(), consumerCfg()
prodCfg.RedisURL, consCfg.RedisURL = redisURL, redisURL
streamName := uuid.NewString()
groupName := fmt.Sprintf("group_%s", streamName)
prodCfg.RedisGroup, consCfg.RedisGroup = groupName, groupName
prodCfg.RedisStream, consCfg.RedisStream = streamName, streamName
streamName := fmt.Sprintf("stream:%s", uuid.NewString())
for _, o := range opts {
o.apply(consCfg, prodCfg)
}
producer, err := NewProducer[testRequest, testResponse](prodCfg)
producer, err := NewProducer[testRequest, testResponse](redisClient, streamName, prodCfg)
if err != nil {
t.Fatalf("Error creating new producer: %v", err)
}

var consumers []*Consumer[testRequest, testResponse]
for i := 0; i < consumersCount; i++ {
c, err := NewConsumer[testRequest, testResponse](consCfg)
c, err := NewConsumer[testRequest, testResponse](redisClient, streamName, consCfg)
if err != nil {
t.Fatalf("Error creating new consumer: %v", err)
}
consumers = append(consumers, c)
}
createGroup(ctx, t, streamName, groupName, producer.client)
createGroup(ctx, t, streamName, producer.client)
t.Cleanup(func() {
ctx := context.Background()
destroyGroup(ctx, t, streamName, groupName, producer.client)
destroyGroup(ctx, t, streamName, producer.client)
var keys []string
for _, c := range consumers {
keys = append(keys, c.heartBeatKey())
Expand Down
20 changes: 14 additions & 6 deletions validator/server_api/redisconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/pubsub"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/validator"
"github.com/offchainlabs/nitro/validator/server_api/validation"
Expand All @@ -24,18 +25,25 @@ type RedisValidationServer struct {
}

func NewRedisValidationServer(cfg *validation.RedisValidationServerConfig) (*RedisValidationServer, error) {
res := &RedisValidationServer{}
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
redisClient, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
}
consumers := make(map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState])
for _, hash := range cfg.ModuleRoots {
mr := common.HexToHash(hash)
conf := cfg.ConsumerConfig.Clone()
conf.RedisStream, conf.RedisGroup = redisStreamForRoot(mr), redisGroupForRoot(mr)
c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](&conf)
c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, redisStreamForRoot(mr), &cfg.ConsumerConfig)
if err != nil {
return nil, fmt.Errorf("creating consumer for validation: %w", err)
}
res.consumers[mr] = c
consumers[mr] = c
}
return res, nil
return &RedisValidationServer{
consumers: consumers,
}, nil
}

func (s *RedisValidationServer) Start(ctx_in context.Context) {
Expand Down
Loading

0 comments on commit 51d4666

Please sign in to comment.