Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Redis streams in to Nitro's validation #2241

Merged
merged 17 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func createNodeImpl(
txStreamer.SetInboxReaders(inboxReader, delayedBridge)

var statelessBlockValidator *staker.StatelessBlockValidator
if config.BlockValidator.ValidationServerConfigs[0].URL != "" {
if config.BlockValidator.RedisValidationClientConfig.Enabled() || config.BlockValidator.ValidationServerConfigs[0].URL != "" {
statelessBlockValidator, err = staker.NewStatelessBlockValidator(
inboxReader,
inboxTracker,
Expand Down
57 changes: 18 additions & 39 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/spf13/pflag"
)
Expand All @@ -21,68 +20,49 @@ type ConsumerConfig struct {
// Duration after which consumer is considered to be dead if heartbeat
// is not updated.
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Redis url for Redis streams and locks.
RedisURL string `koanf:"redis-url"`
// Redis stream name.
RedisStream string `koanf:"redis-stream"`
// Redis consumer group name.
RedisGroup string `koanf:"redis-group"`
}

var DefaultConsumerConfig = &ConsumerConfig{
var DefaultConsumerConfig = ConsumerConfig{
ResponseEntryTimeout: time.Hour,
KeepAliveTimeout: 5 * time.Minute,
RedisStream: "",
RedisGroup: "",
}

var DefaultTestConsumerConfig = &ConsumerConfig{
RedisStream: "",
RedisGroup: "",
var TestConsumerConfig = ConsumerConfig{
ResponseEntryTimeout: time.Minute,
KeepAliveTimeout: 30 * time.Millisecond,
}

func ConsumerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".response-entry-timeout", DefaultConsumerConfig.ResponseEntryTimeout, "timeout for response entry")
f.Duration(prefix+".keepalive-timeout", DefaultConsumerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.String(prefix+".redis-url", DefaultConsumerConfig.RedisURL, "redis url for redis stream")
f.String(prefix+".redis-stream", DefaultConsumerConfig.RedisStream, "redis stream name to read from")
f.String(prefix+".redis-group", DefaultConsumerConfig.RedisGroup, "redis stream consumer group name")
}

// Consumer implements a consumer for redis stream provides heartbeat to
// indicate it is alive.
type Consumer[Request any, Response any] struct {
stopwaiter.StopWaiter
id string
client redis.UniversalClient
cfg *ConsumerConfig
id string
client redis.UniversalClient
redisStream string
redisGroup string
cfg *ConsumerConfig
}

type Message[Request any] struct {
ID string
Value Request
}

func NewConsumer[Request any, Response any](ctx context.Context, cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
if cfg.RedisStream == "" {
func NewConsumer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
if streamName == "" {
return nil, fmt.Errorf("redis stream name cannot be empty")
}
if cfg.RedisGroup == "" {
return nil, fmt.Errorf("redis group name cannot be emtpy")
}
c, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
}
consumer := &Consumer[Request, Response]{
id: uuid.NewString(),
client: c,
cfg: cfg,
id: uuid.NewString(),
client: client,
redisStream: streamName,
redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group.
cfg: cfg,
}
return consumer, nil
}
Expand Down Expand Up @@ -125,11 +105,11 @@ func (c *Consumer[Request, Response]) heartBeat(ctx context.Context) {
// unresponsive consumer, if not then reads from the stream.
func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Request], error) {
res, err := c.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.cfg.RedisGroup,
Group: c.redisGroup,
Consumer: c.id,
// Receive only messages that were never delivered to any other consumer,
// that is, only new messages.
Streams: []string{c.cfg.RedisStream, ">"},
Streams: []string{c.redisStream, ">"},
Count: 1,
Block: time.Millisecond, // 0 seems to block the read instead of immediately returning
}).Result()
Expand All @@ -142,7 +122,6 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
if len(res) != 1 || len(res[0].Messages) != 1 {
return nil, fmt.Errorf("redis returned entries: %+v, for querying single message", res)
}
log.Debug(fmt.Sprintf("Consumer: %s consuming message: %s", c.id, res[0].Messages[0].ID))
var (
value = res[0].Messages[0].Values[messageKey]
data, ok = (value).(string)
Expand All @@ -154,7 +133,7 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("unmarshaling value: %v, error: %w", value, err)
}

log.Debug("Redis stream consuming", "consumer_id", c.id, "message_id", res[0].Messages[0].ID)
return &Message[Request]{
ID: res[0].Messages[0].ID,
Value: req,
Expand All @@ -170,7 +149,7 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s
if err != nil || !acquired {
return fmt.Errorf("setting result for message: %v, error: %w", messageID, err)
}
if _, err := c.client.XAck(ctx, c.cfg.RedisStream, c.cfg.RedisGroup, messageID).Result(); err != nil {
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
return nil
Expand Down
67 changes: 26 additions & 41 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/spf13/pflag"
)
Expand All @@ -32,9 +31,11 @@ const (

type Producer[Request any, Response any] struct {
stopwaiter.StopWaiter
id string
client redis.UniversalClient
cfg *ProducerConfig
id string
client redis.UniversalClient
redisStream string
redisGroup string
cfg *ProducerConfig

promisesLock sync.RWMutex
promises map[string]*containers.Promise[Response]
Expand All @@ -49,10 +50,7 @@ type ProducerConfig struct {
// When enabled, messages that are sent to consumers that later die before
// processing them, will be re-inserted into the stream to be proceesed by
// another consumer
EnableReproduce bool `koanf:"enable-reproduce"`
RedisURL string `koanf:"redis-url"`
// Redis stream name.
RedisStream string `koanf:"redis-stream"`
EnableReproduce bool `koanf:"enable-reproduce"`
// Interval duration in which producer checks for pending messages delivered
// to the consumers that are currently inactive.
CheckPendingInterval time.Duration `koanf:"check-pending-interval"`
Expand All @@ -61,56 +59,42 @@ type ProducerConfig struct {
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Interval duration for checking the result set by consumers.
CheckResultInterval time.Duration `koanf:"check-result-interval"`
// Redis consumer group name.
RedisGroup string `koanf:"redis-group"`
}

var DefaultProducerConfig = &ProducerConfig{
var DefaultProducerConfig = ProducerConfig{
EnableReproduce: true,
RedisStream: "",
RedisGroup: "",
CheckPendingInterval: time.Second,
KeepAliveTimeout: 5 * time.Minute,
CheckResultInterval: 5 * time.Second,
}

var DefaultTestProducerConfig = &ProducerConfig{
var TestProducerConfig = ProducerConfig{
EnableReproduce: true,
RedisStream: "",
RedisGroup: "",
CheckPendingInterval: 10 * time.Millisecond,
KeepAliveTimeout: 100 * time.Millisecond,
CheckResultInterval: 5 * time.Millisecond,
}

func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable-reproduce", DefaultProducerConfig.EnableReproduce, "when enabled, messages with dead consumer will be re-inserted into the stream")
f.String(prefix+".redis-url", DefaultProducerConfig.RedisURL, "redis url for redis stream")
f.Duration(prefix+".check-pending-interval", DefaultProducerConfig.CheckPendingInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".keepalive-timeout", DefaultProducerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.String(prefix+".redis-stream", DefaultProducerConfig.RedisStream, "redis stream name to read from")
f.String(prefix+".redis-group", DefaultProducerConfig.RedisGroup, "redis stream consumer group name")
}

func NewProducer[Request any, Response any](cfg *ProducerConfig) (*Producer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
func NewProducer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ProducerConfig) (*Producer[Request, Response], error) {
if client == nil {
return nil, fmt.Errorf("redis client cannot be nil")
}
if cfg.RedisStream == "" {
return nil, fmt.Errorf("redis stream cannot be emtpy")
}
if cfg.RedisGroup == "" {
return nil, fmt.Errorf("redis group cannot be empty")
}
c, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
if streamName == "" {
return nil, fmt.Errorf("stream name cannot be empty")
}
return &Producer[Request, Response]{
id: uuid.NewString(),
client: c,
cfg: cfg,
promises: make(map[string]*containers.Promise[Response]),
id: uuid.NewString(),
client: client,
redisStream: streamName,
redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group.
cfg: cfg,
promises: make(map[string]*containers.Promise[Response]),
}, nil
}

Expand Down Expand Up @@ -142,7 +126,7 @@ func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) tim
}
acked := make(map[string]Request)
for _, msg := range msgs {
if _, err := p.client.XAck(ctx, p.cfg.RedisStream, p.cfg.RedisGroup, msg.ID).Result(); err != nil {
if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil {
log.Error("ACKing message", "error", err)
continue
}
Expand Down Expand Up @@ -200,7 +184,7 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
return nil, fmt.Errorf("marshaling value: %w", err)
}
id, err := p.client.XAdd(ctx, &redis.XAddArgs{
Stream: p.cfg.RedisStream,
Stream: p.redisStream,
Values: map[string]any{messageKey: val},
}).Result()
if err != nil {
Expand All @@ -224,6 +208,7 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
}

func (p *Producer[Request, Response]) Produce(ctx context.Context, value Request) (*containers.Promise[Response], error) {
log.Debug("Redis stream producing", "value", value)
p.once.Do(func() {
p.StopWaiter.CallIteratively(p.checkAndReproduce)
p.StopWaiter.CallIteratively(p.checkResponses)
Expand All @@ -248,8 +233,8 @@ func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool {

func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) {
pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: p.cfg.RedisStream,
Group: p.cfg.RedisGroup,
Stream: p.redisStream,
Group: p.redisGroup,
Start: "-",
End: "+",
Count: 100,
Expand Down Expand Up @@ -285,8 +270,8 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
}
log.Info("Attempting to claim", "messages", ids)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Stream: p.cfg.RedisStream,
Group: p.cfg.RedisGroup,
Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: ids,
Expand Down
40 changes: 20 additions & 20 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@ type testResponse struct {
Response string
}

func createGroup(ctx context.Context, t *testing.T, streamName, groupName string, client redis.UniversalClient) {
func createRedisGroup(ctx context.Context, t *testing.T, streamName string, client redis.UniversalClient) {
t.Helper()
if _, err := client.XGroupCreateMkStream(ctx, streamName, groupName, "$").Result(); err != nil {
// Stream name and group name are the same.
if _, err := client.XGroupCreateMkStream(ctx, streamName, streamName, "$").Result(); err != nil {
t.Fatalf("Error creating stream group: %v", err)
}
}

func destroyGroup(ctx context.Context, t *testing.T, streamName, groupName string, client redis.UniversalClient) {
func destroyRedisGroup(ctx context.Context, t *testing.T, streamName string, client redis.UniversalClient) {
t.Helper()
if _, err := client.XGroupDestroy(ctx, streamName, groupName).Result(); err != nil {
if _, err := client.XGroupDestroy(ctx, streamName, streamName).Result(); err != nil {
log.Debug("Error destroying a stream group", "error", err)
}
}
Expand All @@ -54,49 +55,48 @@ func (e *disableReproduce) apply(_ *ConsumerConfig, prodCfg *ProducerConfig) {

func producerCfg() *ProducerConfig {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cant you just return &TestProducerConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We run tests in parallel and some of them change property on it, e.g. set EnableReproduce to false. If you use same instance for concurrently running tests, tests will become flaky.

return &ProducerConfig{
EnableReproduce: DefaultTestProducerConfig.EnableReproduce,
CheckPendingInterval: DefaultTestProducerConfig.CheckPendingInterval,
KeepAliveTimeout: DefaultTestProducerConfig.KeepAliveTimeout,
CheckResultInterval: DefaultTestProducerConfig.CheckResultInterval,
EnableReproduce: TestProducerConfig.EnableReproduce,
CheckPendingInterval: TestProducerConfig.CheckPendingInterval,
KeepAliveTimeout: TestProducerConfig.KeepAliveTimeout,
CheckResultInterval: TestProducerConfig.CheckResultInterval,
}
}

func consumerCfg() *ConsumerConfig {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(same as producerCfg)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above (technically we don't change consumer config, but just to follow the pattern).

return &ConsumerConfig{
ResponseEntryTimeout: DefaultTestConsumerConfig.ResponseEntryTimeout,
KeepAliveTimeout: DefaultTestConsumerConfig.KeepAliveTimeout,
ResponseEntryTimeout: TestConsumerConfig.ResponseEntryTimeout,
KeepAliveTimeout: TestConsumerConfig.KeepAliveTimeout,
}
}

func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) (*Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) {
t.Helper()
redisURL := redisutil.CreateTestRedis(ctx, t)
redisClient, err := redisutil.RedisClientFromURL(redisutil.CreateTestRedis(ctx, t))
if err != nil {
t.Fatalf("RedisClientFromURL() unexpected error: %v", err)
}
prodCfg, consCfg := producerCfg(), consumerCfg()
prodCfg.RedisURL, consCfg.RedisURL = redisURL, redisURL
streamName := uuid.NewString()
groupName := fmt.Sprintf("group_%s", streamName)
prodCfg.RedisGroup, consCfg.RedisGroup = groupName, groupName
prodCfg.RedisStream, consCfg.RedisStream = streamName, streamName
streamName := fmt.Sprintf("stream:%s", uuid.NewString())
for _, o := range opts {
o.apply(consCfg, prodCfg)
}
producer, err := NewProducer[testRequest, testResponse](prodCfg)
producer, err := NewProducer[testRequest, testResponse](redisClient, streamName, prodCfg)
if err != nil {
t.Fatalf("Error creating new producer: %v", err)
}

var consumers []*Consumer[testRequest, testResponse]
for i := 0; i < consumersCount; i++ {
c, err := NewConsumer[testRequest, testResponse](ctx, consCfg)
c, err := NewConsumer[testRequest, testResponse](redisClient, streamName, consCfg)
if err != nil {
t.Fatalf("Error creating new consumer: %v", err)
}
consumers = append(consumers, c)
}
createGroup(ctx, t, streamName, groupName, producer.client)
createRedisGroup(ctx, t, streamName, producer.client)
t.Cleanup(func() {
ctx := context.Background()
destroyGroup(ctx, t, streamName, groupName, producer.client)
destroyRedisGroup(ctx, t, streamName, producer.client)
var keys []string
for _, c := range consumers {
keys = append(keys, c.heartBeatKey())
Expand Down
Loading
Loading