Skip to content

Commit

Permalink
Merge branch 'master' into pebble-extra-options
Browse files Browse the repository at this point in the history
  • Loading branch information
magicxyyz committed Apr 25, 2024
2 parents 028fd31 + fb101a8 commit 67e1692
Show file tree
Hide file tree
Showing 18 changed files with 625 additions and 282 deletions.
2 changes: 1 addition & 1 deletion arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func createNodeImpl(
txStreamer.SetInboxReaders(inboxReader, delayedBridge)

var statelessBlockValidator *staker.StatelessBlockValidator
if config.BlockValidator.ValidationServerConfigs[0].URL != "" {
if config.BlockValidator.RedisValidationClientConfig.Enabled() || config.BlockValidator.ValidationServerConfigs[0].URL != "" {
statelessBlockValidator, err = staker.NewStatelessBlockValidator(
inboxReader,
inboxTracker,
Expand Down
4 changes: 1 addition & 3 deletions gethhook/geth-hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ func init() {
precompileErrors := make(map[[4]byte]abi.Error)
for addr, precompile := range precompiles.Precompiles() {
for _, errABI := range precompile.Precompile().GetErrorABIs() {
var id [4]byte
copy(id[:], errABI.ID[:4])
precompileErrors[id] = errABI
precompileErrors[[4]byte(errABI.ID.Bytes())] = errABI
}
var wrapped vm.AdvancedPrecompile = ArbosPrecompileWrapper{precompile}
vm.PrecompiledContractsArbitrum[addr] = wrapped
Expand Down
57 changes: 18 additions & 39 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/spf13/pflag"
)
Expand All @@ -21,68 +20,49 @@ type ConsumerConfig struct {
// Duration after which consumer is considered to be dead if heartbeat
// is not updated.
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Redis url for Redis streams and locks.
RedisURL string `koanf:"redis-url"`
// Redis stream name.
RedisStream string `koanf:"redis-stream"`
// Redis consumer group name.
RedisGroup string `koanf:"redis-group"`
}

var DefaultConsumerConfig = &ConsumerConfig{
var DefaultConsumerConfig = ConsumerConfig{
ResponseEntryTimeout: time.Hour,
KeepAliveTimeout: 5 * time.Minute,
RedisStream: "",
RedisGroup: "",
}

var DefaultTestConsumerConfig = &ConsumerConfig{
RedisStream: "",
RedisGroup: "",
var TestConsumerConfig = ConsumerConfig{
ResponseEntryTimeout: time.Minute,
KeepAliveTimeout: 30 * time.Millisecond,
}

func ConsumerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".response-entry-timeout", DefaultConsumerConfig.ResponseEntryTimeout, "timeout for response entry")
f.Duration(prefix+".keepalive-timeout", DefaultConsumerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.String(prefix+".redis-url", DefaultConsumerConfig.RedisURL, "redis url for redis stream")
f.String(prefix+".redis-stream", DefaultConsumerConfig.RedisStream, "redis stream name to read from")
f.String(prefix+".redis-group", DefaultConsumerConfig.RedisGroup, "redis stream consumer group name")
}

// Consumer implements a consumer for redis stream provides heartbeat to
// indicate it is alive.
type Consumer[Request any, Response any] struct {
stopwaiter.StopWaiter
id string
client redis.UniversalClient
cfg *ConsumerConfig
id string
client redis.UniversalClient
redisStream string
redisGroup string
cfg *ConsumerConfig
}

type Message[Request any] struct {
ID string
Value Request
}

func NewConsumer[Request any, Response any](ctx context.Context, cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
if cfg.RedisStream == "" {
func NewConsumer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
if streamName == "" {
return nil, fmt.Errorf("redis stream name cannot be empty")
}
if cfg.RedisGroup == "" {
return nil, fmt.Errorf("redis group name cannot be emtpy")
}
c, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
}
consumer := &Consumer[Request, Response]{
id: uuid.NewString(),
client: c,
cfg: cfg,
id: uuid.NewString(),
client: client,
redisStream: streamName,
redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group.
cfg: cfg,
}
return consumer, nil
}
Expand Down Expand Up @@ -125,11 +105,11 @@ func (c *Consumer[Request, Response]) heartBeat(ctx context.Context) {
// unresponsive consumer, if not then reads from the stream.
func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Request], error) {
res, err := c.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.cfg.RedisGroup,
Group: c.redisGroup,
Consumer: c.id,
// Receive only messages that were never delivered to any other consumer,
// that is, only new messages.
Streams: []string{c.cfg.RedisStream, ">"},
Streams: []string{c.redisStream, ">"},
Count: 1,
Block: time.Millisecond, // 0 seems to block the read instead of immediately returning
}).Result()
Expand All @@ -142,7 +122,6 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
if len(res) != 1 || len(res[0].Messages) != 1 {
return nil, fmt.Errorf("redis returned entries: %+v, for querying single message", res)
}
log.Debug(fmt.Sprintf("Consumer: %s consuming message: %s", c.id, res[0].Messages[0].ID))
var (
value = res[0].Messages[0].Values[messageKey]
data, ok = (value).(string)
Expand All @@ -154,7 +133,7 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("unmarshaling value: %v, error: %w", value, err)
}

log.Debug("Redis stream consuming", "consumer_id", c.id, "message_id", res[0].Messages[0].ID)
return &Message[Request]{
ID: res[0].Messages[0].ID,
Value: req,
Expand All @@ -170,7 +149,7 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s
if err != nil || !acquired {
return fmt.Errorf("setting result for message: %v, error: %w", messageID, err)
}
if _, err := c.client.XAck(ctx, c.cfg.RedisStream, c.cfg.RedisGroup, messageID).Result(); err != nil {
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
return nil
Expand Down
68 changes: 27 additions & 41 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/spf13/pflag"
)
Expand All @@ -32,9 +31,11 @@ const (

type Producer[Request any, Response any] struct {
stopwaiter.StopWaiter
id string
client redis.UniversalClient
cfg *ProducerConfig
id string
client redis.UniversalClient
redisStream string
redisGroup string
cfg *ProducerConfig

promisesLock sync.RWMutex
promises map[string]*containers.Promise[Response]
Expand All @@ -49,10 +50,7 @@ type ProducerConfig struct {
// When enabled, messages that are sent to consumers that later die before
// processing them, will be re-inserted into the stream to be proceesed by
// another consumer
EnableReproduce bool `koanf:"enable-reproduce"`
RedisURL string `koanf:"redis-url"`
// Redis stream name.
RedisStream string `koanf:"redis-stream"`
EnableReproduce bool `koanf:"enable-reproduce"`
// Interval duration in which producer checks for pending messages delivered
// to the consumers that are currently inactive.
CheckPendingInterval time.Duration `koanf:"check-pending-interval"`
Expand All @@ -61,56 +59,43 @@ type ProducerConfig struct {
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Interval duration for checking the result set by consumers.
CheckResultInterval time.Duration `koanf:"check-result-interval"`
// Redis consumer group name.
RedisGroup string `koanf:"redis-group"`
}

var DefaultProducerConfig = &ProducerConfig{
var DefaultProducerConfig = ProducerConfig{
EnableReproduce: true,
RedisStream: "",
RedisGroup: "",
CheckPendingInterval: time.Second,
KeepAliveTimeout: 5 * time.Minute,
CheckResultInterval: 5 * time.Second,
}

var DefaultTestProducerConfig = &ProducerConfig{
var TestProducerConfig = ProducerConfig{
EnableReproduce: true,
RedisStream: "",
RedisGroup: "",
CheckPendingInterval: 10 * time.Millisecond,
KeepAliveTimeout: 100 * time.Millisecond,
CheckResultInterval: 5 * time.Millisecond,
}

func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable-reproduce", DefaultProducerConfig.EnableReproduce, "when enabled, messages with dead consumer will be re-inserted into the stream")
f.String(prefix+".redis-url", DefaultProducerConfig.RedisURL, "redis url for redis stream")
f.Duration(prefix+".check-pending-interval", DefaultProducerConfig.CheckPendingInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".check-result-interval", DefaultProducerConfig.CheckResultInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".keepalive-timeout", DefaultProducerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.String(prefix+".redis-stream", DefaultProducerConfig.RedisStream, "redis stream name to read from")
f.String(prefix+".redis-group", DefaultProducerConfig.RedisGroup, "redis stream consumer group name")
}

func NewProducer[Request any, Response any](cfg *ProducerConfig) (*Producer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
func NewProducer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ProducerConfig) (*Producer[Request, Response], error) {
if client == nil {
return nil, fmt.Errorf("redis client cannot be nil")
}
if cfg.RedisStream == "" {
return nil, fmt.Errorf("redis stream cannot be emtpy")
}
if cfg.RedisGroup == "" {
return nil, fmt.Errorf("redis group cannot be empty")
}
c, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
if streamName == "" {
return nil, fmt.Errorf("stream name cannot be empty")
}
return &Producer[Request, Response]{
id: uuid.NewString(),
client: c,
cfg: cfg,
promises: make(map[string]*containers.Promise[Response]),
id: uuid.NewString(),
client: client,
redisStream: streamName,
redisGroup: streamName, // There is 1-1 mapping of redis stream and consumer group.
cfg: cfg,
promises: make(map[string]*containers.Promise[Response]),
}, nil
}

Expand Down Expand Up @@ -142,7 +127,7 @@ func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) tim
}
acked := make(map[string]Request)
for _, msg := range msgs {
if _, err := p.client.XAck(ctx, p.cfg.RedisStream, p.cfg.RedisGroup, msg.ID).Result(); err != nil {
if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil {
log.Error("ACKing message", "error", err)
continue
}
Expand Down Expand Up @@ -200,7 +185,7 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
return nil, fmt.Errorf("marshaling value: %w", err)
}
id, err := p.client.XAdd(ctx, &redis.XAddArgs{
Stream: p.cfg.RedisStream,
Stream: p.redisStream,
Values: map[string]any{messageKey: val},
}).Result()
if err != nil {
Expand All @@ -224,6 +209,7 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
}

func (p *Producer[Request, Response]) Produce(ctx context.Context, value Request) (*containers.Promise[Response], error) {
log.Debug("Redis stream producing", "value", value)
p.once.Do(func() {
p.StopWaiter.CallIteratively(p.checkAndReproduce)
p.StopWaiter.CallIteratively(p.checkResponses)
Expand All @@ -248,8 +234,8 @@ func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool {

func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) {
pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: p.cfg.RedisStream,
Group: p.cfg.RedisGroup,
Stream: p.redisStream,
Group: p.redisGroup,
Start: "-",
End: "+",
Count: 100,
Expand Down Expand Up @@ -285,8 +271,8 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
}
log.Info("Attempting to claim", "messages", ids)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Stream: p.cfg.RedisStream,
Group: p.cfg.RedisGroup,
Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: ids,
Expand Down
40 changes: 20 additions & 20 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@ type testResponse struct {
Response string
}

func createGroup(ctx context.Context, t *testing.T, streamName, groupName string, client redis.UniversalClient) {
func createRedisGroup(ctx context.Context, t *testing.T, streamName string, client redis.UniversalClient) {
t.Helper()
if _, err := client.XGroupCreateMkStream(ctx, streamName, groupName, "$").Result(); err != nil {
// Stream name and group name are the same.
if _, err := client.XGroupCreateMkStream(ctx, streamName, streamName, "$").Result(); err != nil {
t.Fatalf("Error creating stream group: %v", err)
}
}

func destroyGroup(ctx context.Context, t *testing.T, streamName, groupName string, client redis.UniversalClient) {
func destroyRedisGroup(ctx context.Context, t *testing.T, streamName string, client redis.UniversalClient) {
t.Helper()
if _, err := client.XGroupDestroy(ctx, streamName, groupName).Result(); err != nil {
if _, err := client.XGroupDestroy(ctx, streamName, streamName).Result(); err != nil {
log.Debug("Error destroying a stream group", "error", err)
}
}
Expand All @@ -54,49 +55,48 @@ func (e *disableReproduce) apply(_ *ConsumerConfig, prodCfg *ProducerConfig) {

func producerCfg() *ProducerConfig {
return &ProducerConfig{
EnableReproduce: DefaultTestProducerConfig.EnableReproduce,
CheckPendingInterval: DefaultTestProducerConfig.CheckPendingInterval,
KeepAliveTimeout: DefaultTestProducerConfig.KeepAliveTimeout,
CheckResultInterval: DefaultTestProducerConfig.CheckResultInterval,
EnableReproduce: TestProducerConfig.EnableReproduce,
CheckPendingInterval: TestProducerConfig.CheckPendingInterval,
KeepAliveTimeout: TestProducerConfig.KeepAliveTimeout,
CheckResultInterval: TestProducerConfig.CheckResultInterval,
}
}

func consumerCfg() *ConsumerConfig {
return &ConsumerConfig{
ResponseEntryTimeout: DefaultTestConsumerConfig.ResponseEntryTimeout,
KeepAliveTimeout: DefaultTestConsumerConfig.KeepAliveTimeout,
ResponseEntryTimeout: TestConsumerConfig.ResponseEntryTimeout,
KeepAliveTimeout: TestConsumerConfig.KeepAliveTimeout,
}
}

func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) (*Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) {
t.Helper()
redisURL := redisutil.CreateTestRedis(ctx, t)
redisClient, err := redisutil.RedisClientFromURL(redisutil.CreateTestRedis(ctx, t))
if err != nil {
t.Fatalf("RedisClientFromURL() unexpected error: %v", err)
}
prodCfg, consCfg := producerCfg(), consumerCfg()
prodCfg.RedisURL, consCfg.RedisURL = redisURL, redisURL
streamName := uuid.NewString()
groupName := fmt.Sprintf("group_%s", streamName)
prodCfg.RedisGroup, consCfg.RedisGroup = groupName, groupName
prodCfg.RedisStream, consCfg.RedisStream = streamName, streamName
streamName := fmt.Sprintf("stream:%s", uuid.NewString())
for _, o := range opts {
o.apply(consCfg, prodCfg)
}
producer, err := NewProducer[testRequest, testResponse](prodCfg)
producer, err := NewProducer[testRequest, testResponse](redisClient, streamName, prodCfg)
if err != nil {
t.Fatalf("Error creating new producer: %v", err)
}

var consumers []*Consumer[testRequest, testResponse]
for i := 0; i < consumersCount; i++ {
c, err := NewConsumer[testRequest, testResponse](ctx, consCfg)
c, err := NewConsumer[testRequest, testResponse](redisClient, streamName, consCfg)
if err != nil {
t.Fatalf("Error creating new consumer: %v", err)
}
consumers = append(consumers, c)
}
createGroup(ctx, t, streamName, groupName, producer.client)
createRedisGroup(ctx, t, streamName, producer.client)
t.Cleanup(func() {
ctx := context.Background()
destroyGroup(ctx, t, streamName, groupName, producer.client)
destroyRedisGroup(ctx, t, streamName, producer.client)
var keys []string
for _, c := range consumers {
keys = append(keys, c.heartBeatKey())
Expand Down
Loading

0 comments on commit 67e1692

Please sign in to comment.