Skip to content

Commit

Permalink
Merge branch 'master' into string_slice_fail
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi authored Jul 19, 2024
2 parents 5babad9 + 16434a2 commit 8183827
Show file tree
Hide file tree
Showing 18 changed files with 267 additions and 195 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ jobs:
if: matrix.test-mode == 'defaults'
run: |
packages=`go list ./...`
stdbuf -oL gotestsum --format short-verbose --packages=github.com/offchainlabs/nitro/cmd/nitro --rerun-fails=1 --no-color=false -- ./cmd/nitro -coverprofile=coverage.txt -covermode=atomic -coverpkg=./...,./go-ethereum/... -timeout 20m -parallel=8 > >(stdbuf -oL tee full.log | grep -vE "INFO|seal")
with:
fail_ci_if_error: true
stdbuf -oL gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 --no-color=false -- ./... -coverprofile=coverage.txt -covermode=atomic -coverpkg=./...,./go-ethereum/... -timeout 20m -parallel=8 -tags=cionly > >(stdbuf -oL tee full.log | grep -vE "INFO|seal")
- name: run tests with race detection
if: matrix.test-mode == 'race'
Expand Down
2 changes: 1 addition & 1 deletion arbstate/daprovider/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func RecoverPayloadFromDasBatch(

keysetPreimage, err := keysetFetcher.GetKeysetByHash(ctx, cert.KeysetHash)
if err != nil {
log.Error("Couldn't get keyset", "err", err)
log.Error("Couldn't get keyset", "err", err, "keysetHash", common.Bytes2Hex(cert.KeysetHash[:]))
return nil, err
}
if preimageRecorder != nil {
Expand Down
6 changes: 3 additions & 3 deletions cmd/staterecovery/staterecovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/triedb/hashdb"
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/hashdb"
)

func RecreateMissingStates(chainDb ethdb.Database, bc *core.BlockChain, cacheConfig *core.CacheConfig, startBlock uint64) error {
Expand All @@ -32,7 +32,7 @@ func RecreateMissingStates(chainDb ethdb.Database, bc *core.BlockChain, cacheCon
}
hashConfig := *hashdb.Defaults
hashConfig.CleanCacheSize = cacheConfig.TrieCleanLimit * 1024 * 1024
trieConfig := &trie.Config{
trieConfig := &triedb.Config{
Preimages: false,
HashDB: &hashConfig,
}
Expand Down
2 changes: 1 addition & 1 deletion go-ethereum
Submodule go-ethereum updated 164 files
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ require (
github.com/graph-gophers/graphql-go v1.3.0 // indirect
github.com/h2non/filetype v1.0.6 // indirect
github.com/hashicorp/go-bexpr v0.1.10 // indirect
github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7 // indirect
github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
Expand Down Expand Up @@ -162,7 +162,7 @@ require (
go.opencensus.io v0.22.5 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sync v0.5.0
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,8 @@ github.com/hashicorp/vault/api v1.0.4/go.mod h1:gDcqh3WGcR1cpF5AJz/B1UFheUEneMoI
github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M=
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7 h1:3JQNjnMRil1yD0IfZKHF9GxxWKDJGj8I0IqOUol//sw=
github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc=
github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 h1:X4egAf/gcS1zATw6wn4Ej8vjuVGxeHdan+bRb2ebyv4=
github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA=
github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU=
Expand Down
5 changes: 4 additions & 1 deletion pubsub/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"strings"

"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
Expand All @@ -22,7 +23,9 @@ func CreateStream(ctx context.Context, streamName string, client redis.Universal
func StreamExists(ctx context.Context, streamName string, client redis.UniversalClient) bool {
got, err := client.Do(ctx, "XINFO", "STREAM", streamName).Result()
if err != nil {
log.Error("Reading redis streams", "error", err)
if !strings.Contains(err.Error(), "no such key") {
log.Error("redis error", "err", err, "searching stream", streamName)
}
return false
}
return got != nil
Expand Down
182 changes: 121 additions & 61 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -59,27 +62,31 @@ type ProducerConfig struct {
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Interval duration for checking the result set by consumers.
CheckResultInterval time.Duration `koanf:"check-result-interval"`
CheckPendingItems int64 `koanf:"check-pending-items"`
}

var DefaultProducerConfig = ProducerConfig{
EnableReproduce: true,
CheckPendingInterval: time.Second,
KeepAliveTimeout: 5 * time.Minute,
CheckResultInterval: 5 * time.Second,
CheckPendingItems: 256,
}

var TestProducerConfig = ProducerConfig{
EnableReproduce: true,
EnableReproduce: false,
CheckPendingInterval: 10 * time.Millisecond,
KeepAliveTimeout: 100 * time.Millisecond,
CheckResultInterval: 5 * time.Millisecond,
CheckPendingItems: 256,
}

func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable-reproduce", DefaultProducerConfig.EnableReproduce, "when enabled, messages with dead consumer will be re-inserted into the stream")
f.Duration(prefix+".check-pending-interval", DefaultProducerConfig.CheckPendingInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".check-result-interval", DefaultProducerConfig.CheckResultInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".keepalive-timeout", DefaultProducerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.Int64(prefix+".check-pending-items", DefaultProducerConfig.CheckPendingItems, "items to screen during check-pending")
}

func NewProducer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ProducerConfig) (*Producer[Request, Response], error) {
Expand All @@ -99,70 +106,146 @@ func NewProducer[Request any, Response any](client redis.UniversalClient, stream
}, nil
}

func (p *Producer[Request, Response]) errorPromisesFor(msgs []*Message[Request]) {
func (p *Producer[Request, Response]) errorPromisesFor(msgIds []string) {
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
for _, msg := range msgs {
if promise, found := p.promises[msg.ID]; found {
for _, msg := range msgIds {
if promise, found := p.promises[msg]; found {
promise.ProduceError(fmt.Errorf("internal error, consumer died while serving the request"))
delete(p.promises, msg.ID)
delete(p.promises, msg)
}
}
}

// checkAndReproduce reproduce pending messages that were sent to consumers
// that are currently inactive.
func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) time.Duration {
msgs, err := p.checkPending(ctx)
staleIds, err := p.checkPending(ctx)
if err != nil {
log.Error("Checking pending messages", "error", err)
return p.cfg.CheckPendingInterval
}
if len(msgs) == 0 {
if len(staleIds) == 0 {
return p.cfg.CheckPendingInterval
}
if !p.cfg.EnableReproduce {
p.errorPromisesFor(msgs)
return p.cfg.CheckPendingInterval
if p.cfg.EnableReproduce {
err = p.reproduceIds(ctx, staleIds)
if err != nil {
log.Warn("filed reproducing messages", "err", err)
}
} else {
p.errorPromisesFor(staleIds)
}
acked := make(map[string]Request)
for _, msg := range msgs {
return p.cfg.CheckPendingInterval
}

func (p *Producer[Request, Response]) reproduceIds(ctx context.Context, staleIds []string) error {
log.Info("Attempting to claim", "messages", staleIds)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: staleIds,
}).Result()
if err != nil {
return fmt.Errorf("claiming ownership on messages: %v, error: %w", staleIds, err)
}
for _, msg := range claimedMsgs {
data, ok := (msg.Values[messageKey]).(string)
if !ok {
log.Error("redis producer reproduce: message not string", "id", msg.ID, "value", msg.Values[messageKey])
continue
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
log.Error("redis producer reproduce: message not a request", "id", msg.ID, "err", err, "value", msg.Values[messageKey])
continue
}
if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil {
log.Error("ACKing message", "error", err)
log.Error("redis producer reproduce: could not ACK", "id", msg.ID, "err", err)
continue
}
acked[msg.ID] = msg.Value
}
for k, v := range acked {
// Only re-insert messages that were removed the the pending list first.
_, err := p.reproduce(ctx, v, k)
if err != nil {
log.Error("Re-inserting pending messages with inactive consumers", "error", err)
if _, err := p.reproduce(ctx, req, msg.ID); err != nil {
log.Error("redis producer reproduce: error", "err", err)
}
}
return p.cfg.CheckPendingInterval
return nil
}

func setMinIdInt(min *[2]uint64, id string) error {
idParts := strings.Split(id, "-")
if len(idParts) != 2 {
return fmt.Errorf("invalid i.d: %v", id)
}
idTimeStamp, err := strconv.ParseUint(idParts[0], 10, 64)
if err != nil {
return fmt.Errorf("invalid i.d: %v err: %w", id, err)
}
if idTimeStamp > min[0] {
return nil
}
idSerial, err := strconv.ParseUint(idParts[1], 10, 64)
if err != nil {
return fmt.Errorf("invalid i.d serial: %v err: %w", id, err)
}
if idTimeStamp < min[0] {
min[0] = idTimeStamp
min[1] = idSerial
return nil
}
// idTimeStamp == min[0]
if idSerial < min[1] {
min[1] = idSerial
}
return nil
}

// checkResponses checks iteratively whether response for the promise is ready.
func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.Duration {
minIdInt := [2]uint64{math.MaxUint64, math.MaxUint64}
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
responded := 0
errored := 0
for id, promise := range p.promises {
if ctx.Err() != nil {
return 0
}
res, err := p.client.Get(ctx, id).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
continue
errSetId := setMinIdInt(&minIdInt, id)
if errSetId != nil {
log.Error("error setting minId", "err", err)
return p.cfg.CheckResultInterval
}
if !errors.Is(err, redis.Nil) {
log.Error("Error reading value in redis", "key", id, "error", err)
}
log.Error("Error reading value in redis", "key", id, "error", err)
continue
}
var resp Response
if err := json.Unmarshal([]byte(res), &resp); err != nil {
promise.ProduceError(fmt.Errorf("error unmarshalling: %w", err))
log.Error("Error unmarshaling", "value", res, "error", err)
continue
errored++
} else {
promise.Produce(resp)
responded++
}
promise.Produce(resp)
delete(p.promises, id)
}
var trimmed int64
var trimErr error
minId := "+"
if minIdInt[0] < math.MaxUint64 {
minId = fmt.Sprintf("%d-%d", minIdInt[0], minIdInt[1])
trimmed, trimErr = p.client.XTrimMinID(ctx, p.redisStream, minId).Result()
} else {
trimmed, trimErr = p.client.XTrimMaxLen(ctx, p.redisStream, 0).Result()
}
log.Trace("trimming", "id", minId, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr)
return p.cfg.CheckResultInterval
}

Expand All @@ -184,20 +267,23 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
if err != nil {
return nil, fmt.Errorf("marshaling value: %w", err)
}
// catching the promiseLock before we sendXadd makes sure promise ids will
// be always ascending
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
id, err := p.client.XAdd(ctx, &redis.XAddArgs{
Stream: p.redisStream,
Values: map[string]any{messageKey: val},
}).Result()
if err != nil {
return nil, fmt.Errorf("adding values to redis: %w", err)
}
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
promise := p.promises[oldKey]
if oldKey != "" && promise == nil {
// This will happen if the old consumer became inactive but then ack_d
// the message afterwards.
return nil, fmt.Errorf("error reproducing the message, could not find existing one")
// don't error
log.Warn("tried reproducing a message but it wasn't found - probably got response", "oldKey", oldKey)
}
if oldKey == "" || promise == nil {
pr := containers.NewPromise[Response](nil)
Expand Down Expand Up @@ -232,13 +318,14 @@ func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool {
return found
}

func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) {
// returns ids of pending messages that's worker doesn't appear alive
func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]string, error) {
pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Start: "-",
End: "+",
Count: 100,
Count: p.cfg.CheckPendingItems,
}).Result()

if err != nil && !errors.Is(err, redis.Nil) {
Expand All @@ -247,6 +334,9 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
if len(pendingMessages) == 0 {
return nil, nil
}
if len(pendingMessages) >= int(p.cfg.CheckPendingItems) {
log.Warn("redis producer: many pending items found", "stream", p.redisStream, "check-pending-items", p.cfg.CheckPendingItems)
}
// IDs of the pending messages with inactive consumers.
var ids []string
active := make(map[string]bool)
Expand All @@ -265,35 +355,5 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
}
ids = append(ids, msg.ID)
}
if len(ids) == 0 {
log.Trace("There are no pending messages with inactive consumers")
return nil, nil
}
log.Info("Attempting to claim", "messages", ids)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: ids,
}).Result()
if err != nil {
return nil, fmt.Errorf("claiming ownership on messages: %v, error: %w", ids, err)
}
var res []*Message[Request]
for _, msg := range claimedMsgs {
data, ok := (msg.Values[messageKey]).(string)
if !ok {
return nil, fmt.Errorf("casting request: %v to bytes", msg.Values[messageKey])
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("marshaling value: %v, error: %w", msg.Values[messageKey], err)
}
res = append(res, &Message[Request]{
ID: msg.ID,
Value: req,
})
}
return res, nil
return ids, nil
}
Loading

0 comments on commit 8183827

Please sign in to comment.