Skip to content

Commit

Permalink
Merge branch 'master' into merkle-perf
Browse files Browse the repository at this point in the history
  • Loading branch information
eljobe committed Jul 18, 2024
2 parents 4ddf81f + 16434a2 commit fe3e67c
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 163 deletions.
2 changes: 1 addition & 1 deletion arbstate/daprovider/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func RecoverPayloadFromDasBatch(

keysetPreimage, err := keysetFetcher.GetKeysetByHash(ctx, cert.KeysetHash)
if err != nil {
log.Error("Couldn't get keyset", "err", err)
log.Error("Couldn't get keyset", "err", err, "keysetHash", common.Bytes2Hex(cert.KeysetHash[:]))
return nil, err
}
if preimageRecorder != nil {
Expand Down
5 changes: 4 additions & 1 deletion pubsub/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"strings"

"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
Expand All @@ -22,7 +23,9 @@ func CreateStream(ctx context.Context, streamName string, client redis.Universal
func StreamExists(ctx context.Context, streamName string, client redis.UniversalClient) bool {
got, err := client.Do(ctx, "XINFO", "STREAM", streamName).Result()
if err != nil {
log.Error("Reading redis streams", "error", err)
if !strings.Contains(err.Error(), "no such key") {
log.Error("redis error", "err", err, "searching stream", streamName)
}
return false
}
return got != nil
Expand Down
182 changes: 121 additions & 61 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -59,27 +62,31 @@ type ProducerConfig struct {
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Interval duration for checking the result set by consumers.
CheckResultInterval time.Duration `koanf:"check-result-interval"`
CheckPendingItems int64 `koanf:"check-pending-items"`
}

var DefaultProducerConfig = ProducerConfig{
EnableReproduce: true,
CheckPendingInterval: time.Second,
KeepAliveTimeout: 5 * time.Minute,
CheckResultInterval: 5 * time.Second,
CheckPendingItems: 256,
}

var TestProducerConfig = ProducerConfig{
EnableReproduce: true,
EnableReproduce: false,
CheckPendingInterval: 10 * time.Millisecond,
KeepAliveTimeout: 100 * time.Millisecond,
CheckResultInterval: 5 * time.Millisecond,
CheckPendingItems: 256,
}

func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable-reproduce", DefaultProducerConfig.EnableReproduce, "when enabled, messages with dead consumer will be re-inserted into the stream")
f.Duration(prefix+".check-pending-interval", DefaultProducerConfig.CheckPendingInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".check-result-interval", DefaultProducerConfig.CheckResultInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".keepalive-timeout", DefaultProducerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.Int64(prefix+".check-pending-items", DefaultProducerConfig.CheckPendingItems, "items to screen during check-pending")
}

func NewProducer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ProducerConfig) (*Producer[Request, Response], error) {
Expand All @@ -99,70 +106,146 @@ func NewProducer[Request any, Response any](client redis.UniversalClient, stream
}, nil
}

func (p *Producer[Request, Response]) errorPromisesFor(msgs []*Message[Request]) {
func (p *Producer[Request, Response]) errorPromisesFor(msgIds []string) {
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
for _, msg := range msgs {
if promise, found := p.promises[msg.ID]; found {
for _, msg := range msgIds {
if promise, found := p.promises[msg]; found {
promise.ProduceError(fmt.Errorf("internal error, consumer died while serving the request"))
delete(p.promises, msg.ID)
delete(p.promises, msg)
}
}
}

// checkAndReproduce reproduce pending messages that were sent to consumers
// that are currently inactive.
func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) time.Duration {
msgs, err := p.checkPending(ctx)
staleIds, err := p.checkPending(ctx)
if err != nil {
log.Error("Checking pending messages", "error", err)
return p.cfg.CheckPendingInterval
}
if len(msgs) == 0 {
if len(staleIds) == 0 {
return p.cfg.CheckPendingInterval
}
if !p.cfg.EnableReproduce {
p.errorPromisesFor(msgs)
return p.cfg.CheckPendingInterval
if p.cfg.EnableReproduce {
err = p.reproduceIds(ctx, staleIds)
if err != nil {
log.Warn("filed reproducing messages", "err", err)
}
} else {
p.errorPromisesFor(staleIds)
}
acked := make(map[string]Request)
for _, msg := range msgs {
return p.cfg.CheckPendingInterval
}

func (p *Producer[Request, Response]) reproduceIds(ctx context.Context, staleIds []string) error {
log.Info("Attempting to claim", "messages", staleIds)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: staleIds,
}).Result()
if err != nil {
return fmt.Errorf("claiming ownership on messages: %v, error: %w", staleIds, err)
}
for _, msg := range claimedMsgs {
data, ok := (msg.Values[messageKey]).(string)
if !ok {
log.Error("redis producer reproduce: message not string", "id", msg.ID, "value", msg.Values[messageKey])
continue
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
log.Error("redis producer reproduce: message not a request", "id", msg.ID, "err", err, "value", msg.Values[messageKey])
continue
}
if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil {
log.Error("ACKing message", "error", err)
log.Error("redis producer reproduce: could not ACK", "id", msg.ID, "err", err)
continue
}
acked[msg.ID] = msg.Value
}
for k, v := range acked {
// Only re-insert messages that were removed the the pending list first.
_, err := p.reproduce(ctx, v, k)
if err != nil {
log.Error("Re-inserting pending messages with inactive consumers", "error", err)
if _, err := p.reproduce(ctx, req, msg.ID); err != nil {
log.Error("redis producer reproduce: error", "err", err)
}
}
return p.cfg.CheckPendingInterval
return nil
}

func setMinIdInt(min *[2]uint64, id string) error {
idParts := strings.Split(id, "-")
if len(idParts) != 2 {
return fmt.Errorf("invalid i.d: %v", id)
}
idTimeStamp, err := strconv.ParseUint(idParts[0], 10, 64)
if err != nil {
return fmt.Errorf("invalid i.d: %v err: %w", id, err)
}
if idTimeStamp > min[0] {
return nil
}
idSerial, err := strconv.ParseUint(idParts[1], 10, 64)
if err != nil {
return fmt.Errorf("invalid i.d serial: %v err: %w", id, err)
}
if idTimeStamp < min[0] {
min[0] = idTimeStamp
min[1] = idSerial
return nil
}
// idTimeStamp == min[0]
if idSerial < min[1] {
min[1] = idSerial
}
return nil
}

// checkResponses checks iteratively whether response for the promise is ready.
func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.Duration {
minIdInt := [2]uint64{math.MaxUint64, math.MaxUint64}
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
responded := 0
errored := 0
for id, promise := range p.promises {
if ctx.Err() != nil {
return 0
}
res, err := p.client.Get(ctx, id).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
continue
errSetId := setMinIdInt(&minIdInt, id)
if errSetId != nil {
log.Error("error setting minId", "err", err)
return p.cfg.CheckResultInterval
}
if !errors.Is(err, redis.Nil) {
log.Error("Error reading value in redis", "key", id, "error", err)
}
log.Error("Error reading value in redis", "key", id, "error", err)
continue
}
var resp Response
if err := json.Unmarshal([]byte(res), &resp); err != nil {
promise.ProduceError(fmt.Errorf("error unmarshalling: %w", err))
log.Error("Error unmarshaling", "value", res, "error", err)
continue
errored++
} else {
promise.Produce(resp)
responded++
}
promise.Produce(resp)
delete(p.promises, id)
}
var trimmed int64
var trimErr error
minId := "+"
if minIdInt[0] < math.MaxUint64 {
minId = fmt.Sprintf("%d-%d", minIdInt[0], minIdInt[1])
trimmed, trimErr = p.client.XTrimMinID(ctx, p.redisStream, minId).Result()
} else {
trimmed, trimErr = p.client.XTrimMaxLen(ctx, p.redisStream, 0).Result()
}
log.Trace("trimming", "id", minId, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr)
return p.cfg.CheckResultInterval
}

Expand All @@ -184,20 +267,23 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
if err != nil {
return nil, fmt.Errorf("marshaling value: %w", err)
}
// catching the promiseLock before we sendXadd makes sure promise ids will
// be always ascending
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
id, err := p.client.XAdd(ctx, &redis.XAddArgs{
Stream: p.redisStream,
Values: map[string]any{messageKey: val},
}).Result()
if err != nil {
return nil, fmt.Errorf("adding values to redis: %w", err)
}
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
promise := p.promises[oldKey]
if oldKey != "" && promise == nil {
// This will happen if the old consumer became inactive but then ack_d
// the message afterwards.
return nil, fmt.Errorf("error reproducing the message, could not find existing one")
// don't error
log.Warn("tried reproducing a message but it wasn't found - probably got response", "oldKey", oldKey)
}
if oldKey == "" || promise == nil {
pr := containers.NewPromise[Response](nil)
Expand Down Expand Up @@ -232,13 +318,14 @@ func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool {
return found
}

func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) {
// returns ids of pending messages that's worker doesn't appear alive
func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]string, error) {
pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Start: "-",
End: "+",
Count: 100,
Count: p.cfg.CheckPendingItems,
}).Result()

if err != nil && !errors.Is(err, redis.Nil) {
Expand All @@ -247,6 +334,9 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
if len(pendingMessages) == 0 {
return nil, nil
}
if len(pendingMessages) >= int(p.cfg.CheckPendingItems) {
log.Warn("redis producer: many pending items found", "stream", p.redisStream, "check-pending-items", p.cfg.CheckPendingItems)
}
// IDs of the pending messages with inactive consumers.
var ids []string
active := make(map[string]bool)
Expand All @@ -265,35 +355,5 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
}
ids = append(ids, msg.ID)
}
if len(ids) == 0 {
log.Trace("There are no pending messages with inactive consumers")
return nil, nil
}
log.Info("Attempting to claim", "messages", ids)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: ids,
}).Result()
if err != nil {
return nil, fmt.Errorf("claiming ownership on messages: %v, error: %w", ids, err)
}
var res []*Message[Request]
for _, msg := range claimedMsgs {
data, ok := (msg.Values[messageKey]).(string)
if !ok {
return nil, fmt.Errorf("casting request: %v to bytes", msg.Values[messageKey])
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("marshaling value: %v, error: %w", msg.Values[messageKey], err)
}
res = append(res, &Message[Request]{
ID: msg.ID,
Value: req,
})
}
return res, nil
return ids, nil
}
Loading

0 comments on commit fe3e67c

Please sign in to comment.