diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 56c587a41d..7fea04de4b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -145,9 +145,7 @@ jobs: if: matrix.test-mode == 'defaults' run: | packages=`go list ./...` - stdbuf -oL gotestsum --format short-verbose --packages=github.com/offchainlabs/nitro/cmd/nitro --rerun-fails=1 --no-color=false -- ./cmd/nitro -coverprofile=coverage.txt -covermode=atomic -coverpkg=./...,./go-ethereum/... -timeout 20m -parallel=8 > >(stdbuf -oL tee full.log | grep -vE "INFO|seal") - with: - fail_ci_if_error: true + stdbuf -oL gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 --no-color=false -- ./... -coverprofile=coverage.txt -covermode=atomic -coverpkg=./...,./go-ethereum/... -timeout 20m -parallel=8 -tags=cionly > >(stdbuf -oL tee full.log | grep -vE "INFO|seal") - name: run tests with race detection if: matrix.test-mode == 'race' diff --git a/arbstate/daprovider/util.go b/arbstate/daprovider/util.go index 8f880b9228..48cec884a3 100644 --- a/arbstate/daprovider/util.go +++ b/arbstate/daprovider/util.go @@ -188,7 +188,7 @@ func RecoverPayloadFromDasBatch( keysetPreimage, err := keysetFetcher.GetKeysetByHash(ctx, cert.KeysetHash) if err != nil { - log.Error("Couldn't get keyset", "err", err) + log.Error("Couldn't get keyset", "err", err, "keysetHash", common.Bytes2Hex(cert.KeysetHash[:])) return nil, err } if preimageRecorder != nil { diff --git a/cmd/staterecovery/staterecovery.go b/cmd/staterecovery/staterecovery.go index 58ad06ad14..19ed51ef2d 100644 --- a/cmd/staterecovery/staterecovery.go +++ b/cmd/staterecovery/staterecovery.go @@ -9,8 +9,8 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/trie" - "github.com/ethereum/go-ethereum/trie/triedb/hashdb" + "github.com/ethereum/go-ethereum/triedb" + "github.com/ethereum/go-ethereum/triedb/hashdb" ) func RecreateMissingStates(chainDb ethdb.Database, bc *core.BlockChain, cacheConfig *core.CacheConfig, startBlock uint64) error { @@ -32,7 +32,7 @@ func RecreateMissingStates(chainDb ethdb.Database, bc *core.BlockChain, cacheCon } hashConfig := *hashdb.Defaults hashConfig.CleanCacheSize = cacheConfig.TrieCleanLimit * 1024 * 1024 - trieConfig := &trie.Config{ + trieConfig := &triedb.Config{ Preimages: false, HashDB: &hashConfig, } diff --git a/go-ethereum b/go-ethereum index c186780e82..18256c2dfc 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit c186780e82856be27d1576db4d1fd79984562165 +Subproject commit 18256c2dfcce8fd567aa05e03fbc11a4c17aa550 diff --git a/go.mod b/go.mod index 00cee1a99a..d0c8d5e719 100644 --- a/go.mod +++ b/go.mod @@ -118,7 +118,7 @@ require ( github.com/graph-gophers/graphql-go v1.3.0 // indirect github.com/h2non/filetype v1.0.6 // indirect github.com/hashicorp/go-bexpr v0.1.10 // indirect - github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7 // indirect + github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect @@ -162,7 +162,7 @@ require ( go.opencensus.io v0.22.5 // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.21.0 // indirect - golang.org/x/sync v0.5.0 // indirect + golang.org/x/sync v0.5.0 golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.3.0 // indirect google.golang.org/protobuf v1.30.0 // indirect diff --git a/go.sum b/go.sum index 8676c270c4..ff4726b22f 100644 --- a/go.sum +++ b/go.sum @@ -420,8 +420,8 @@ github.com/hashicorp/vault/api v1.0.4/go.mod h1:gDcqh3WGcR1cpF5AJz/B1UFheUEneMoI github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M= github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= -github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7 h1:3JQNjnMRil1yD0IfZKHF9GxxWKDJGj8I0IqOUol//sw= -github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc= +github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 h1:X4egAf/gcS1zATw6wn4Ej8vjuVGxeHdan+bRb2ebyv4= +github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU= diff --git a/pubsub/common.go b/pubsub/common.go index 9f05304e46..d7f041af15 100644 --- a/pubsub/common.go +++ b/pubsub/common.go @@ -2,6 +2,7 @@ package pubsub import ( "context" + "strings" "github.com/ethereum/go-ethereum/log" "github.com/go-redis/redis/v8" @@ -22,7 +23,9 @@ func CreateStream(ctx context.Context, streamName string, client redis.Universal func StreamExists(ctx context.Context, streamName string, client redis.UniversalClient) bool { got, err := client.Do(ctx, "XINFO", "STREAM", streamName).Result() if err != nil { - log.Error("Reading redis streams", "error", err) + if !strings.Contains(err.Error(), "no such key") { + log.Error("redis error", "err", err, "searching stream", streamName) + } return false } return got != nil diff --git a/pubsub/producer.go b/pubsub/producer.go index 074670ca0f..2b1cdb5e3f 100644 --- a/pubsub/producer.go +++ b/pubsub/producer.go @@ -13,6 +13,9 @@ import ( "encoding/json" "errors" "fmt" + "math" + "strconv" + "strings" "sync" "time" @@ -59,6 +62,7 @@ type ProducerConfig struct { KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"` // Interval duration for checking the result set by consumers. CheckResultInterval time.Duration `koanf:"check-result-interval"` + CheckPendingItems int64 `koanf:"check-pending-items"` } var DefaultProducerConfig = ProducerConfig{ @@ -66,13 +70,15 @@ var DefaultProducerConfig = ProducerConfig{ CheckPendingInterval: time.Second, KeepAliveTimeout: 5 * time.Minute, CheckResultInterval: 5 * time.Second, + CheckPendingItems: 256, } var TestProducerConfig = ProducerConfig{ - EnableReproduce: true, + EnableReproduce: false, CheckPendingInterval: 10 * time.Millisecond, KeepAliveTimeout: 100 * time.Millisecond, CheckResultInterval: 5 * time.Millisecond, + CheckPendingItems: 256, } func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) { @@ -80,6 +86,7 @@ func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Duration(prefix+".check-pending-interval", DefaultProducerConfig.CheckPendingInterval, "interval in which producer checks pending messages whether consumer processing them is inactive") f.Duration(prefix+".check-result-interval", DefaultProducerConfig.CheckResultInterval, "interval in which producer checks pending messages whether consumer processing them is inactive") f.Duration(prefix+".keepalive-timeout", DefaultProducerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed") + f.Int64(prefix+".check-pending-items", DefaultProducerConfig.CheckPendingItems, "items to screen during check-pending") } func NewProducer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ProducerConfig) (*Producer[Request, Response], error) { @@ -99,13 +106,13 @@ func NewProducer[Request any, Response any](client redis.UniversalClient, stream }, nil } -func (p *Producer[Request, Response]) errorPromisesFor(msgs []*Message[Request]) { +func (p *Producer[Request, Response]) errorPromisesFor(msgIds []string) { p.promisesLock.Lock() defer p.promisesLock.Unlock() - for _, msg := range msgs { - if promise, found := p.promises[msg.ID]; found { + for _, msg := range msgIds { + if promise, found := p.promises[msg]; found { promise.ProduceError(fmt.Errorf("internal error, consumer died while serving the request")) - delete(p.promises, msg.ID) + delete(p.promises, msg) } } } @@ -113,56 +120,132 @@ func (p *Producer[Request, Response]) errorPromisesFor(msgs []*Message[Request]) // checkAndReproduce reproduce pending messages that were sent to consumers // that are currently inactive. func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) time.Duration { - msgs, err := p.checkPending(ctx) + staleIds, err := p.checkPending(ctx) if err != nil { log.Error("Checking pending messages", "error", err) return p.cfg.CheckPendingInterval } - if len(msgs) == 0 { + if len(staleIds) == 0 { return p.cfg.CheckPendingInterval } - if !p.cfg.EnableReproduce { - p.errorPromisesFor(msgs) - return p.cfg.CheckPendingInterval + if p.cfg.EnableReproduce { + err = p.reproduceIds(ctx, staleIds) + if err != nil { + log.Warn("filed reproducing messages", "err", err) + } + } else { + p.errorPromisesFor(staleIds) } - acked := make(map[string]Request) - for _, msg := range msgs { + return p.cfg.CheckPendingInterval +} + +func (p *Producer[Request, Response]) reproduceIds(ctx context.Context, staleIds []string) error { + log.Info("Attempting to claim", "messages", staleIds) + claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{ + Stream: p.redisStream, + Group: p.redisGroup, + Consumer: p.id, + MinIdle: p.cfg.KeepAliveTimeout, + Messages: staleIds, + }).Result() + if err != nil { + return fmt.Errorf("claiming ownership on messages: %v, error: %w", staleIds, err) + } + for _, msg := range claimedMsgs { + data, ok := (msg.Values[messageKey]).(string) + if !ok { + log.Error("redis producer reproduce: message not string", "id", msg.ID, "value", msg.Values[messageKey]) + continue + } + var req Request + if err := json.Unmarshal([]byte(data), &req); err != nil { + log.Error("redis producer reproduce: message not a request", "id", msg.ID, "err", err, "value", msg.Values[messageKey]) + continue + } if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil { - log.Error("ACKing message", "error", err) + log.Error("redis producer reproduce: could not ACK", "id", msg.ID, "err", err) continue } - acked[msg.ID] = msg.Value - } - for k, v := range acked { // Only re-insert messages that were removed the the pending list first. - _, err := p.reproduce(ctx, v, k) - if err != nil { - log.Error("Re-inserting pending messages with inactive consumers", "error", err) + if _, err := p.reproduce(ctx, req, msg.ID); err != nil { + log.Error("redis producer reproduce: error", "err", err) } } - return p.cfg.CheckPendingInterval + return nil +} + +func setMinIdInt(min *[2]uint64, id string) error { + idParts := strings.Split(id, "-") + if len(idParts) != 2 { + return fmt.Errorf("invalid i.d: %v", id) + } + idTimeStamp, err := strconv.ParseUint(idParts[0], 10, 64) + if err != nil { + return fmt.Errorf("invalid i.d: %v err: %w", id, err) + } + if idTimeStamp > min[0] { + return nil + } + idSerial, err := strconv.ParseUint(idParts[1], 10, 64) + if err != nil { + return fmt.Errorf("invalid i.d serial: %v err: %w", id, err) + } + if idTimeStamp < min[0] { + min[0] = idTimeStamp + min[1] = idSerial + return nil + } + // idTimeStamp == min[0] + if idSerial < min[1] { + min[1] = idSerial + } + return nil } // checkResponses checks iteratively whether response for the promise is ready. func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.Duration { + minIdInt := [2]uint64{math.MaxUint64, math.MaxUint64} p.promisesLock.Lock() defer p.promisesLock.Unlock() + responded := 0 + errored := 0 for id, promise := range p.promises { + if ctx.Err() != nil { + return 0 + } res, err := p.client.Get(ctx, id).Result() if err != nil { - if errors.Is(err, redis.Nil) { - continue + errSetId := setMinIdInt(&minIdInt, id) + if errSetId != nil { + log.Error("error setting minId", "err", err) + return p.cfg.CheckResultInterval + } + if !errors.Is(err, redis.Nil) { + log.Error("Error reading value in redis", "key", id, "error", err) } - log.Error("Error reading value in redis", "key", id, "error", err) + continue } var resp Response if err := json.Unmarshal([]byte(res), &resp); err != nil { + promise.ProduceError(fmt.Errorf("error unmarshalling: %w", err)) log.Error("Error unmarshaling", "value", res, "error", err) - continue + errored++ + } else { + promise.Produce(resp) + responded++ } - promise.Produce(resp) delete(p.promises, id) } + var trimmed int64 + var trimErr error + minId := "+" + if minIdInt[0] < math.MaxUint64 { + minId = fmt.Sprintf("%d-%d", minIdInt[0], minIdInt[1]) + trimmed, trimErr = p.client.XTrimMinID(ctx, p.redisStream, minId).Result() + } else { + trimmed, trimErr = p.client.XTrimMaxLen(ctx, p.redisStream, 0).Result() + } + log.Trace("trimming", "id", minId, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr) return p.cfg.CheckResultInterval } @@ -184,6 +267,10 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque if err != nil { return nil, fmt.Errorf("marshaling value: %w", err) } + // catching the promiseLock before we sendXadd makes sure promise ids will + // be always ascending + p.promisesLock.Lock() + defer p.promisesLock.Unlock() id, err := p.client.XAdd(ctx, &redis.XAddArgs{ Stream: p.redisStream, Values: map[string]any{messageKey: val}, @@ -191,13 +278,12 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque if err != nil { return nil, fmt.Errorf("adding values to redis: %w", err) } - p.promisesLock.Lock() - defer p.promisesLock.Unlock() promise := p.promises[oldKey] if oldKey != "" && promise == nil { // This will happen if the old consumer became inactive but then ack_d // the message afterwards. - return nil, fmt.Errorf("error reproducing the message, could not find existing one") + // don't error + log.Warn("tried reproducing a message but it wasn't found - probably got response", "oldKey", oldKey) } if oldKey == "" || promise == nil { pr := containers.NewPromise[Response](nil) @@ -232,13 +318,14 @@ func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool { return found } -func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) { +// returns ids of pending messages that's worker doesn't appear alive +func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]string, error) { pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{ Stream: p.redisStream, Group: p.redisGroup, Start: "-", End: "+", - Count: 100, + Count: p.cfg.CheckPendingItems, }).Result() if err != nil && !errors.Is(err, redis.Nil) { @@ -247,6 +334,9 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess if len(pendingMessages) == 0 { return nil, nil } + if len(pendingMessages) >= int(p.cfg.CheckPendingItems) { + log.Warn("redis producer: many pending items found", "stream", p.redisStream, "check-pending-items", p.cfg.CheckPendingItems) + } // IDs of the pending messages with inactive consumers. var ids []string active := make(map[string]bool) @@ -265,35 +355,5 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess } ids = append(ids, msg.ID) } - if len(ids) == 0 { - log.Trace("There are no pending messages with inactive consumers") - return nil, nil - } - log.Info("Attempting to claim", "messages", ids) - claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{ - Stream: p.redisStream, - Group: p.redisGroup, - Consumer: p.id, - MinIdle: p.cfg.KeepAliveTimeout, - Messages: ids, - }).Result() - if err != nil { - return nil, fmt.Errorf("claiming ownership on messages: %v, error: %w", ids, err) - } - var res []*Message[Request] - for _, msg := range claimedMsgs { - data, ok := (msg.Values[messageKey]).(string) - if !ok { - return nil, fmt.Errorf("casting request: %v to bytes", msg.Values[messageKey]) - } - var req Request - if err := json.Unmarshal([]byte(data), &req); err != nil { - return nil, fmt.Errorf("marshaling value: %v, error: %w", msg.Values[messageKey], err) - } - res = append(res, &Message[Request]{ - ID: msg.ID, - Value: req, - }) - } - return res, nil + return ids, nil } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 72504602e3..a3140e32ac 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -49,10 +49,12 @@ type configOpt interface { apply(consCfg *ConsumerConfig, prodCfg *ProducerConfig) } -type disableReproduce struct{} +type withReproduce struct { + reproduce bool +} -func (e *disableReproduce) apply(_ *ConsumerConfig, prodCfg *ProducerConfig) { - prodCfg.EnableReproduce = false +func (e *withReproduce) apply(_ *ConsumerConfig, prodCfg *ProducerConfig) { + prodCfg.EnableReproduce = e.reproduce } func producerCfg() *ProducerConfig { @@ -61,6 +63,7 @@ func producerCfg() *ProducerConfig { CheckPendingInterval: TestProducerConfig.CheckPendingInterval, KeepAliveTimeout: TestProducerConfig.KeepAliveTimeout, CheckResultInterval: TestProducerConfig.CheckResultInterval, + CheckPendingItems: TestProducerConfig.CheckPendingItems, } } @@ -71,7 +74,7 @@ func consumerCfg() *ConsumerConfig { } } -func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) (*Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) { +func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) (redis.UniversalClient, string, *Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) { t.Helper() redisClient, err := redisutil.RedisClientFromURL(redisutil.CreateTestRedis(ctx, t)) if err != nil { @@ -107,7 +110,7 @@ func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) log.Debug("Error deleting heartbeat keys", "error", err) } }) - return producer, consumers + return redisClient, streamName, producer, consumers } func messagesMaps(n int) []map[string]string { @@ -118,10 +121,14 @@ func messagesMaps(n int) []map[string]string { return ret } +func msgForIndex(idx int) string { + return fmt.Sprintf("msg: %d", idx) +} + func wantMessages(n int) []string { var ret []string for i := 0; i < n; i++ { - ret = append(ret, fmt.Sprintf("msg: %d", i)) + ret = append(ret, msgForIndex(i)) } sort.Strings(ret) return ret @@ -148,26 +155,25 @@ func produceMessages(ctx context.Context, msgs []string, producer *Producer[test return promises, nil } -func awaitResponses(ctx context.Context, promises []*containers.Promise[testResponse]) ([]string, error) { +func awaitResponses(ctx context.Context, promises []*containers.Promise[testResponse]) ([]string, []int) { var ( responses []string - errs []error + errs []int ) - for _, p := range promises { + for idx, p := range promises { res, err := p.Await(ctx) if err != nil { - errs = append(errs, err) + errs = append(errs, idx) continue } responses = append(responses, res.Response) } - return responses, errors.Join(errs...) + return responses, errs } // consume messages from every consumer except stopped ones. -func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testRequest, testResponse]) ([]map[string]string, [][]string) { +func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testRequest, testResponse], gotMessages []map[string]string) [][]string { t.Helper() - gotMessages := messagesMaps(consumersCount) wantResponses := make([][]string, consumersCount) for idx := 0; idx < consumersCount; idx++ { if consumers[idx].Stopped() { @@ -199,7 +205,7 @@ func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testReques } }) } - return gotMessages, wantResponses + return wantResponses } func TestRedisProduce(t *testing.T) { @@ -208,43 +214,56 @@ func TestRedisProduce(t *testing.T) { for _, tc := range []struct { name string killConsumers bool + autoRecover bool }{ { name: "all consumers are active", killConsumers: false, + autoRecover: false, }, { name: "some consumers killed, others should take over their work", killConsumers: true, + autoRecover: true, + }, + { + name: "some consumers killed, should return failure", + killConsumers: true, + autoRecover: false, }, } { t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - producer, consumers := newProducerConsumers(ctx, t) + redisClient, streamName, producer, consumers := newProducerConsumers(ctx, t, &withReproduce{tc.autoRecover}) producer.Start(ctx) wantMsgs := wantMessages(messagesCount) promises, err := produceMessages(ctx, wantMsgs, producer) if err != nil { t.Fatalf("Error producing messages: %v", err) } + gotMessages := messagesMaps(len(consumers)) if tc.killConsumers { // Consumer messages in every third consumer but don't ack them to check // that other consumers will claim ownership on those messages. for i := 0; i < len(consumers); i += 3 { consumers[i].Start(ctx) - if _, err := consumers[i].Consume(ctx); err != nil { + req, err := consumers[i].Consume(ctx) + if err != nil { t.Errorf("Error consuming message: %v", err) } + if !tc.autoRecover { + gotMessages[i][req.ID] = req.Value.Request + } consumers[i].StopAndWait() } } time.Sleep(time.Second) - gotMessages, wantResponses := consume(ctx, t, consumers) - gotResponses, err := awaitResponses(ctx, promises) - if err != nil { - t.Fatalf("Error awaiting responses: %v", err) + wantResponses := consume(ctx, t, consumers, gotMessages) + gotResponses, errIndexes := awaitResponses(ctx, promises) + if len(errIndexes) != 0 && tc.autoRecover { + t.Fatalf("Error awaiting responses: %v", errIndexes) } producer.StopAndWait() for _, c := range consumers { @@ -254,7 +273,6 @@ func TestRedisProduce(t *testing.T) { if err != nil { t.Fatalf("mergeMaps() unexpected error: %v", err) } - if diff := cmp.Diff(wantMsgs, got); diff != "" { t.Errorf("Unexpected diff (-want +got):\n%s\n", diff) } @@ -266,57 +284,17 @@ func TestRedisProduce(t *testing.T) { if cnt := producer.promisesLen(); cnt != 0 { t.Errorf("Producer still has %d unfullfilled promises", cnt) } + msgs, err := redisClient.XRange(ctx, streamName, "-", "+").Result() + if err != nil { + t.Errorf("XRange failed: %v", err) + } + if len(msgs) != 0 { + t.Errorf("redis still has %v messages", len(msgs)) + } }) } } -func TestRedisReproduceDisabled(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - producer, consumers := newProducerConsumers(ctx, t, &disableReproduce{}) - producer.Start(ctx) - wantMsgs := wantMessages(messagesCount) - promises, err := produceMessages(ctx, wantMsgs, producer) - if err != nil { - t.Fatalf("Error producing messages: %v", err) - } - - // Consumer messages in every third consumer but don't ack them to check - // that other consumers will claim ownership on those messages. - for i := 0; i < len(consumers); i += 3 { - consumers[i].Start(ctx) - if _, err := consumers[i].Consume(ctx); err != nil { - t.Errorf("Error consuming message: %v", err) - } - consumers[i].StopAndWait() - } - - gotMessages, _ := consume(ctx, t, consumers) - gotResponses, err := awaitResponses(ctx, promises) - if err == nil { - t.Fatalf("All promises were fullfilled with reproduce disabled and some consumers killed") - } - producer.StopAndWait() - for _, c := range consumers { - c.StopWaiter.StopAndWait() - } - got, err := mergeValues(gotMessages) - if err != nil { - t.Fatalf("mergeMaps() unexpected error: %v", err) - } - wantMsgCnt := messagesCount - ((consumersCount + 2) / 3) - if len(got) != wantMsgCnt { - t.Fatalf("Got: %d messages, want %d", len(got), wantMsgCnt) - } - if len(gotResponses) != wantMsgCnt { - t.Errorf("Got %d responses want: %d\n", len(gotResponses), wantMsgCnt) - } - if cnt := producer.promisesLen(); cnt != 0 { - t.Errorf("Producer still has %d unfullfilled promises", cnt) - } -} - // mergeValues merges maps from the slice and returns their values. // Returns and error if there exists duplicate key. func mergeValues(messages []map[string]string) ([]string, error) { diff --git a/staker/block_validator.go b/staker/block_validator.go index d7b5f4f6a2..bfb7c24ac6 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -35,6 +35,8 @@ var ( validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil) validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil) validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil) + validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil) + validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil) validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil) ) @@ -283,8 +285,9 @@ func NewBlockValidator( return ret, nil } -func atomicStorePos(addr *atomic.Uint64, val arbutil.MessageIndex) { +func atomicStorePos(addr *atomic.Uint64, val arbutil.MessageIndex, metr metrics.Gauge) { addr.Store(uint64(val)) + metr.Update(int64(val)) } func atomicLoadPos(addr *atomic.Uint64) arbutil.MessageIndex { @@ -588,7 +591,7 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e v.validations.Store(pos, status) v.nextCreateStartGS = endGS v.nextCreatePrevDelayed = msg.DelayedMessagesRead - atomicStorePos(&v.createdA, pos+1) + atomicStorePos(&v.createdA, pos+1, validatorMsgCountCreatedGauge) log.Trace("create validation entry: created", "pos", pos) return true, nil } @@ -667,7 +670,7 @@ func (v *BlockValidator) sendNextRecordRequests(ctx context.Context) (bool, erro return false, err } pos += 1 - atomicStorePos(&v.recordSentA, pos) + atomicStorePos(&v.recordSentA, pos, validatorMsgCountRecordSentGauge) log.Trace("next record request: sent", "pos", pos) } @@ -778,11 +781,10 @@ validationsLoop: log.Error("failed writing new validated to database", "pos", pos, "err", err) } go v.recorder.MarkValid(pos, v.lastValidGS.BlockHash) - atomicStorePos(&v.validatedA, pos+1) + atomicStorePos(&v.validatedA, pos+1, validatorMsgCountValidatedGauge) v.validations.Delete(pos) nonBlockingTrigger(v.createNodesChan) nonBlockingTrigger(v.sendRecordChan) - validatorMsgCountValidatedGauge.Update(int64(pos + 1)) if v.testingProgressMadeChan != nil { nonBlockingTrigger(v.testingProgressMadeChan) } @@ -1222,9 +1224,9 @@ func (v *BlockValidator) checkValidatedGSCaughtUp() (bool, error) { v.nextCreateBatchReread = true v.nextCreateStartGS = v.lastValidGS v.nextCreatePrevDelayed = msg.DelayedMessagesRead - atomicStorePos(&v.createdA, count) - atomicStorePos(&v.recordSentA, count) - atomicStorePos(&v.validatedA, count) + atomicStorePos(&v.createdA, count, validatorMsgCountCreatedGauge) + atomicStorePos(&v.recordSentA, count, validatorMsgCountRecordSentGauge) + atomicStorePos(&v.validatedA, count, validatorMsgCountValidatedGauge) validatorMsgCountValidatedGauge.Update(int64(count)) v.chainCaughtUp = true return true, nil diff --git a/system_tests/common_test.go b/system_tests/common_test.go index ccee6aa8e4..b0748f8639 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -790,12 +790,12 @@ func AddDefaultValNode(t *testing.T, ctx context.Context, nodeConfig *arbnode.Co conf.Wasm.RootPath = wasmRootDir // Enable redis streams when URL is specified if redisURL != "" { - conf.Arbitrator.RedisValidationServerConfig = rediscons.DefaultValidationServerConfig + conf.Arbitrator.RedisValidationServerConfig = rediscons.TestValidationServerConfig redisClient, err := redisutil.RedisClientFromURL(redisURL) if err != nil { t.Fatalf("Error creating redis coordinator: %v", err) } - redisStream := server_api.RedisStreamForRoot(currentRootModule(t)) + redisStream := server_api.RedisStreamForRoot(rediscons.TestValidationServerConfig.StreamPrefix, currentRootModule(t)) createRedisGroup(ctx, t, redisStream, redisClient) conf.Arbitrator.RedisValidationServerConfig.RedisURL = redisURL t.Cleanup(func() { destroyRedisGroup(ctx, t, redisStream, redisClient) }) diff --git a/system_tests/fees_test.go b/system_tests/fees_test.go index 4d8fbf43fd..ccca82e009 100644 --- a/system_tests/fees_test.go +++ b/system_tests/fees_test.go @@ -2,8 +2,10 @@ // For license information, see https://github.com/nitro/blob/master/LICENSE // these tests seems to consume too much memory with race detection -//go:build !race -// +build !race +// Test randomly fails with L1 gas price estimate should tend toward the basefee +// so skipping locally, but running on CI +//go:build !race && cionly +// +build !race,cionly package arbtest diff --git a/system_tests/program_test.go b/system_tests/program_test.go index b05589a1bf..b748739cdc 100644 --- a/system_tests/program_test.go +++ b/system_tests/program_test.go @@ -1668,20 +1668,12 @@ func formatTime(duration time.Duration) string { return fmt.Sprintf("%.2f%s", span, units[unit]) } -func TestWasmRecreate(t *testing.T) { - builder, auth, cleanup := setupProgramTest(t, true) +func testWasmRecreate(t *testing.T, builder *NodeBuilder, storeTx *types.Transaction, loadTx *types.Transaction, want []byte) { ctx := builder.ctx l2info := builder.L2Info l2client := builder.L2.Client - defer cleanup() - - storage := deployWasm(t, ctx, auth, l2client, rustFile("storage")) - - zero := common.Hash{} - val := common.HexToHash("0x121233445566") // do an onchain call - store value - storeTx := l2info.PrepareTxTo("Owner", &storage, l2info.TransferGas, nil, argsForStorageWrite(zero, val)) Require(t, l2client.SendTransaction(ctx, storeTx)) _, err := EnsureTxSucceeded(ctx, l2client, storeTx) Require(t, err) @@ -1694,11 +1686,10 @@ func TestWasmRecreate(t *testing.T) { Require(t, err) // make sure reading 2nd value succeeds from 2nd node - loadTx := l2info.PrepareTxTo("Owner", &storage, l2info.TransferGas, nil, argsForStorageRead(zero)) result, err := arbutil.SendTxAsCall(ctx, nodeB.Client, loadTx, l2info.GetAddress("Owner"), nil, true) Require(t, err) - if common.BytesToHash(result) != val { - Fatal(t, "got wrong value") + if !bytes.Equal(result, want) { + t.Fatalf("got wrong value, got %x, want %x", result, want) } // close nodeB cleanupB() @@ -1723,8 +1714,8 @@ func TestWasmRecreate(t *testing.T) { // test nodeB - answers eth_call (requires reloading wasm) result, err = arbutil.SendTxAsCall(ctx, nodeB.Client, loadTx, l2info.GetAddress("Owner"), nil, true) Require(t, err) - if common.BytesToHash(result) != val { - Fatal(t, "got wrong value") + if !bytes.Equal(result, want) { + t.Fatalf("got wrong value, got %x, want %x", result, want) } // send new tx (requires wasm) and check nodeB sees it as well @@ -1743,7 +1734,46 @@ func TestWasmRecreate(t *testing.T) { Fatal(t, "not contents found before delete") } os.RemoveAll(wasmPath) +} + +func TestWasmRecreate(t *testing.T) { + builder, auth, cleanup := setupProgramTest(t, true) + ctx := builder.ctx + l2info := builder.L2Info + l2client := builder.L2.Client + defer cleanup() + + storage := deployWasm(t, ctx, auth, l2client, rustFile("storage")) + + zero := common.Hash{} + val := common.HexToHash("0x121233445566") + + storeTx := l2info.PrepareTxTo("Owner", &storage, l2info.TransferGas, nil, argsForStorageWrite(zero, val)) + loadTx := l2info.PrepareTxTo("Owner", &storage, l2info.TransferGas, nil, argsForStorageRead(zero)) + + testWasmRecreate(t, builder, storeTx, loadTx, val[:]) +} + +func TestWasmRecreateWithDelegatecall(t *testing.T) { + builder, auth, cleanup := setupProgramTest(t, true) + ctx := builder.ctx + l2info := builder.L2Info + l2client := builder.L2.Client + defer cleanup() + + storage := deployWasm(t, ctx, auth, l2client, rustFile("storage")) + multicall := deployWasm(t, ctx, auth, l2client, rustFile("multicall")) + + zero := common.Hash{} + val := common.HexToHash("0x121233445566") + + data := argsForMulticall(vm.DELEGATECALL, storage, big.NewInt(0), argsForStorageWrite(zero, val)) + storeTx := l2info.PrepareTxTo("Owner", &multicall, l2info.TransferGas, nil, data) + + data = argsForMulticall(vm.DELEGATECALL, storage, big.NewInt(0), argsForStorageRead(zero)) + loadTx := l2info.PrepareTxTo("Owner", &multicall, l2info.TransferGas, nil, data) + testWasmRecreate(t, builder, storeTx, loadTx, val[:]) } // createMapFromDb is used in verifying if wasm store rebuilding works diff --git a/system_tests/test_info.go b/system_tests/test_info.go index a4fbb44c44..6313e392ca 100644 --- a/system_tests/test_info.go +++ b/system_tests/test_info.go @@ -16,7 +16,6 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" @@ -109,8 +108,8 @@ func (b *BlockchainTestInfo) GenerateGenesisAccount(name string, balance *big.In }) } -func (b *BlockchainTestInfo) GetGenesisAlloc() core.GenesisAlloc { - alloc := make(core.GenesisAlloc) +func (b *BlockchainTestInfo) GetGenesisAlloc() types.GenesisAlloc { + alloc := make(types.GenesisAlloc) for _, info := range b.ArbInitData.Accounts { var contractCode []byte contractStorage := make(map[common.Hash]common.Hash) @@ -120,7 +119,7 @@ func (b *BlockchainTestInfo) GetGenesisAlloc() core.GenesisAlloc { contractStorage[k] = v } } - alloc[info.Addr] = core.GenesisAccount{ + alloc[info.Addr] = types.Account{ Balance: new(big.Int).Set(info.EthBalance), Nonce: info.Nonce, Code: contractCode, diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index 7594421f9a..0adedc6784 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -20,6 +20,7 @@ import ( type ValidationClientConfig struct { Name string `koanf:"name"` + StreamPrefix string `koanf:"stream-prefix"` Room int32 `koanf:"room"` RedisURL string `koanf:"redis-url"` ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"` @@ -42,6 +43,7 @@ var TestValidationClientConfig = ValidationClientConfig{ Name: "test redis validation client", Room: 2, RedisURL: "", + StreamPrefix: "test-", ProducerConfig: pubsub.TestProducerConfig, CreateStreams: false, } @@ -50,6 +52,7 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".name", DefaultValidationClientConfig.Name, "validation client name") f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room") f.String(prefix+".redis-url", DefaultValidationClientConfig.RedisURL, "redis url") + f.String(prefix+".stream-prefix", DefaultValidationClientConfig.StreamPrefix, "prefix for stream name") pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f) f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist") } @@ -57,14 +60,12 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { // ValidationClient implements validation client through redis streams. type ValidationClient struct { stopwaiter.StopWaiter - name string - room atomic.Int32 + config *ValidationClientConfig + room atomic.Int32 // producers stores moduleRoot to producer mapping. - producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState] - producerConfig pubsub.ProducerConfig - redisClient redis.UniversalClient - moduleRoots []common.Hash - createStreams bool + producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState] + redisClient redis.UniversalClient + moduleRoots []common.Hash } func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) { @@ -76,11 +77,9 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) return nil, err } validationClient := &ValidationClient{ - name: cfg.Name, - producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), - producerConfig: cfg.ProducerConfig, - redisClient: redisClient, - createStreams: cfg.CreateStreams, + config: cfg, + producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), + redisClient: redisClient, } validationClient.room.Store(cfg.Room) return validationClient, nil @@ -88,8 +87,8 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { for _, mr := range moduleRoots { - if c.createStreams { - if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil { + if c.config.CreateStreams { + if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(c.config.StreamPrefix, mr), c.redisClient); err != nil { return fmt.Errorf("creating redis stream: %w", err) } } @@ -98,7 +97,7 @@ func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common. continue } p, err := pubsub.NewProducer[*validator.ValidationInput, validator.GoGlobalState]( - c.redisClient, server_api.RedisStreamForRoot(mr), &c.producerConfig) + c.redisClient, server_api.RedisStreamForRoot(c.config.StreamPrefix, mr), &c.config.ProducerConfig) if err != nil { log.Warn("failed init redis for %v: %w", mr, err) continue @@ -146,10 +145,7 @@ func (c *ValidationClient) Stop() { } func (c *ValidationClient) Name() string { - if c.Started() { - return c.name - } - return "(not started)" + return c.config.Name } func (c *ValidationClient) Room() int { diff --git a/validator/server_api/json.go b/validator/server_api/json.go index 3dd817d5ae..dd646e1aa1 100644 --- a/validator/server_api/json.go +++ b/validator/server_api/json.go @@ -45,8 +45,8 @@ func MachineStepResultFromJson(resultJson *MachineStepResultJson) (*validator.Ma }, nil } -func RedisStreamForRoot(moduleRoot common.Hash) string { - return fmt.Sprintf("stream:%s", moduleRoot.Hex()) +func RedisStreamForRoot(prefix string, moduleRoot common.Hash) string { + return fmt.Sprintf("%sstream:%s", prefix, moduleRoot.Hex()) } type Request struct { diff --git a/validator/server_arb/execution_run.go b/validator/server_arb/execution_run.go index 8bdce145a2..d29a88d34d 100644 --- a/validator/server_arb/execution_run.go +++ b/validator/server_arb/execution_run.go @@ -24,7 +24,7 @@ type executionRun struct { close sync.Once } -// NewExecutionChallengeBackend creates a backend with the given arguments. +// NewExecutionRun creates a backend with the given arguments. // Note: machineCache may be nil, but if present, it must not have a restricted range. func NewExecutionRun( ctxIn context.Context, diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 84f597c095..fb7db1e870 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -37,7 +37,7 @@ func NewValidationServer(cfg *ValidationServerConfig, spawner validator.Validati consumers := make(map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState]) for _, hash := range cfg.ModuleRoots { mr := common.HexToHash(hash) - c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, server_api.RedisStreamForRoot(mr), &cfg.ConsumerConfig) + c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, server_api.RedisStreamForRoot(cfg.StreamPrefix, mr), &cfg.ConsumerConfig) if err != nil { return nil, fmt.Errorf("creating consumer for validation: %w", err) } @@ -130,10 +130,12 @@ type ValidationServerConfig struct { ModuleRoots []string `koanf:"module-roots"` // Timeout on polling for existence of each redis stream. StreamTimeout time.Duration `koanf:"stream-timeout"` + StreamPrefix string `koanf:"stream-prefix"` } var DefaultValidationServerConfig = ValidationServerConfig{ RedisURL: "", + StreamPrefix: "", ConsumerConfig: pubsub.DefaultConsumerConfig, ModuleRoots: []string{}, StreamTimeout: 10 * time.Minute, @@ -141,6 +143,7 @@ var DefaultValidationServerConfig = ValidationServerConfig{ var TestValidationServerConfig = ValidationServerConfig{ RedisURL: "", + StreamPrefix: "test-", ConsumerConfig: pubsub.TestConsumerConfig, ModuleRoots: []string{}, StreamTimeout: time.Minute, @@ -150,6 +153,7 @@ func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) { pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") f.String(prefix+".redis-url", DefaultValidationServerConfig.RedisURL, "url of redis server") + f.String(prefix+".stream-prefix", DefaultValidationServerConfig.StreamPrefix, "prefix for stream name") f.Duration(prefix+".stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") }