Skip to content

Commit

Permalink
Merge branch 'master' into update-gethpin-v1.13.4
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli authored Apr 16, 2024
2 parents 0344f66 + 1dd4fb3 commit 5a649cd
Show file tree
Hide file tree
Showing 11 changed files with 933 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Arbitrum One successfully migrated from the Classic Arbitrum stack onto Nitro on

## License

Nitro is currently licensed under a [Business Source License](./LICENSE), similar to our friends at Uniswap and Aave, with an "Additional Use Grant" to ensure that everyone can have full comfort using and running nodes on all public Arbitrum chains.
Nitro is currently licensed under a [Business Source License](./LICENSE.md), similar to our friends at Uniswap and Aave, with an "Additional Use Grant" to ensure that everyone can have full comfort using and running nodes on all public Arbitrum chains.

The Additional Use Grant also permits the deployment of the Nitro software, in a permissionless fashion and without cost, as a new blockchain provided that the chain settles to either Arbitrum One or Arbitrum Nova.

Expand Down
9 changes: 4 additions & 5 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,8 @@ var DefaultBatchPosterConfig = BatchPosterConfig{
Enable: false,
DisableDasFallbackStoreDataOnChain: false,
// This default is overridden for L3 chains in applyChainParameters in cmd/nitro/nitro.go
MaxSize: 100000,
// TODO: is 1000 bytes an appropriate margin for error vs blob space efficiency?
Max4844BatchSize: blobs.BlobEncodableData*(params.MaxBlobGasPerBlock/params.BlobTxBlobGasPerBlob) - 1000,
MaxSize: 100000,
Max4844BatchSize: blobs.BlobEncodableData*(params.MaxBlobGasPerBlock/params.BlobTxBlobGasPerBlob) - 2000,
PollInterval: time.Second * 10,
ErrorDelay: time.Second * 10,
MaxDelay: time.Hour,
Expand Down Expand Up @@ -491,7 +490,7 @@ func (b *BatchPoster) checkReverts(ctx context.Context, to int64) (bool, error)
return false, fmt.Errorf("getting a receipt for transaction: %v, %w", tx.Hash, err)
}
if r.Status == types.ReceiptStatusFailed {
shouldHalt := !b.config().DataPoster.UseNoOpStorage
shouldHalt := !b.dataPoster.UsingNoOpStorage()
logLevel := log.Warn
if shouldHalt {
logLevel = log.Error
Expand Down Expand Up @@ -1278,7 +1277,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
return false, err
}
if len(kzgBlobs)*params.BlobTxBlobGasPerBlob > params.MaxBlobGasPerBlock {
return false, fmt.Errorf("produced %v blobs for batch but a block can only hold %v", len(kzgBlobs), params.MaxBlobGasPerBlock/params.BlobTxBlobGasPerBlob)
return false, fmt.Errorf("produced %v blobs for batch but a block can only hold %v (compressed batch was %v bytes long)", len(kzgBlobs), params.MaxBlobGasPerBlock/params.BlobTxBlobGasPerBlob, len(sequencerMsg))
}
accessList := b.accessList(int(batchPosition.NextSeqNum), int(b.building.segments.delayedMsg))
// On restart, we may be trying to estimate gas for a batch whose successor has
Expand Down
4 changes: 4 additions & 0 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ func (p *DataPoster) MaxMempoolTransactions() uint64 {
return arbmath.MinInt(config.MaxMempoolTransactions, config.MaxMempoolWeight)
}

func (p *DataPoster) UsingNoOpStorage() bool {
return p.usingNoOpStorage
}

var ErrExceedsMaxMempoolSize = errors.New("posting this transaction will exceed max mempool size")

// Does basic check whether posting transaction with specified nonce would
Expand Down
5 changes: 3 additions & 2 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,8 +1066,9 @@ func (s *TransactionStreamer) ResultAtCount(count arbutil.MessageIndex) (*execut
return s.exec.ResultAtPos(count - 1)
}

// exposed for testing
// return value: true if should be called again immediately
func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution.ExecutionSequencer) bool {
func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution.ExecutionSequencer) bool {
if ctx.Err() != nil {
return false
}
Expand Down Expand Up @@ -1117,7 +1118,7 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution
}

func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struct{}) time.Duration {
if s.executeNextMsg(ctx, s.exec) {
if s.ExecuteNextMsg(ctx, s.exec) {
return 0
}
return s.config().ExecuteMessageLoopDelay
Expand Down
19 changes: 13 additions & 6 deletions execution/gethexec/sync_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,23 @@ func NewSyncMonitor(config *SyncMonitorConfig, exec *ExecutionEngine) *SyncMonit

func (s *SyncMonitor) FullSyncProgressMap() map[string]interface{} {
res := s.consensus.FullSyncProgressMap()
consensusSyncTarget := s.consensus.SyncTargetMessageCount()

built, err := s.exec.HeadMessageNumber()
res["consensusSyncTarget"] = s.consensus.SyncTargetMessageCount()

header, err := s.exec.getCurrentHeader()
if err != nil {
res["headMsgNumberError"] = err
res["currentHeaderError"] = err
} else {
blockNum := header.Number.Uint64()
res["blockNum"] = blockNum
messageNum, err := s.exec.BlockNumberToMessageIndex(blockNum)
if err != nil {
res["messageOfLastBlockError"] = err
} else {
res["messageOfLastBlock"] = messageNum
}
}

res["builtBlock"] = built
res["consensusSyncTarget"] = consensusSyncTarget

return res
}

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ replace github.com/ethereum/go-ethereum => ./go-ethereum
require (
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible
github.com/Shopify/toxiproxy v2.1.4+incompatible
github.com/alicebob/miniredis/v2 v2.21.0
github.com/alicebob/miniredis/v2 v2.32.1
github.com/andybalholm/brotli v1.0.4
github.com/aws/aws-sdk-go-v2 v1.21.2
github.com/aws/aws-sdk-go-v2/config v1.18.45
Expand Down Expand Up @@ -261,7 +261,7 @@ require (
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.7.0 // indirect
Expand Down Expand Up @@ -318,7 +318,7 @@ require (
github.com/go-redis/redis/v8 v8.11.4
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/google/uuid v1.3.1
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/go-bexpr v0.1.10 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.21.0 h1:CdmwIlKUWFBDS+4464GtQiQ0R1vpzOgu4Vnd74rBL7M=
github.com/alicebob/miniredis/v2 v2.21.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv52acZ6kqeS0t88=
github.com/alicebob/miniredis/v2 v2.32.1 h1:Bz7CciDnYSaa0mX5xODh6GUITRSx+cVhjNoOR4JssBo=
github.com/alicebob/miniredis/v2 v2.32.1/go.mod h1:AqkLNAfUm0K07J28hnAyyQKf/x0YkCY/g5DCtuL01Mw=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
Expand Down Expand Up @@ -1695,8 +1695,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
Expand Down
177 changes: 177 additions & 0 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package pubsub

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/spf13/pflag"
)

type ConsumerConfig struct {
// Timeout of result entry in Redis.
ResponseEntryTimeout time.Duration `koanf:"response-entry-timeout"`
// Duration after which consumer is considered to be dead if heartbeat
// is not updated.
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Redis url for Redis streams and locks.
RedisURL string `koanf:"redis-url"`
// Redis stream name.
RedisStream string `koanf:"redis-stream"`
// Redis consumer group name.
RedisGroup string `koanf:"redis-group"`
}

var DefaultConsumerConfig = &ConsumerConfig{
ResponseEntryTimeout: time.Hour,
KeepAliveTimeout: 5 * time.Minute,
RedisStream: "",
RedisGroup: "",
}

var DefaultTestConsumerConfig = &ConsumerConfig{
RedisStream: "",
RedisGroup: "",
ResponseEntryTimeout: time.Minute,
KeepAliveTimeout: 30 * time.Millisecond,
}

func ConsumerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".response-entry-timeout", DefaultConsumerConfig.ResponseEntryTimeout, "timeout for response entry")
f.Duration(prefix+".keepalive-timeout", DefaultConsumerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.String(prefix+".redis-url", DefaultConsumerConfig.RedisURL, "redis url for redis stream")
f.String(prefix+".redis-stream", DefaultConsumerConfig.RedisStream, "redis stream name to read from")
f.String(prefix+".redis-group", DefaultConsumerConfig.RedisGroup, "redis stream consumer group name")
}

// Consumer implements a consumer for redis stream provides heartbeat to
// indicate it is alive.
type Consumer[Request any, Response any] struct {
stopwaiter.StopWaiter
id string
client redis.UniversalClient
cfg *ConsumerConfig
}

type Message[Request any] struct {
ID string
Value Request
}

func NewConsumer[Request any, Response any](ctx context.Context, cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
if cfg.RedisStream == "" {
return nil, fmt.Errorf("redis stream name cannot be empty")
}
if cfg.RedisGroup == "" {
return nil, fmt.Errorf("redis group name cannot be emtpy")
}
c, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
}
consumer := &Consumer[Request, Response]{
id: uuid.NewString(),
client: c,
cfg: cfg,
}
return consumer, nil
}

// Start starts the consumer to iteratively perform heartbeat in configured intervals.
func (c *Consumer[Request, Response]) Start(ctx context.Context) {
c.StopWaiter.Start(ctx, c)
c.StopWaiter.CallIteratively(
func(ctx context.Context) time.Duration {
c.heartBeat(ctx)
return c.cfg.KeepAliveTimeout / 10
},
)
}

func (c *Consumer[Request, Response]) StopAndWait() {
c.StopWaiter.StopAndWait()
}

func heartBeatKey(id string) string {
return fmt.Sprintf("consumer:%s:heartbeat", id)
}

func (c *Consumer[Request, Response]) heartBeatKey() string {
return heartBeatKey(c.id)
}

// heartBeat updates the heartBeat key indicating aliveness.
func (c *Consumer[Request, Response]) heartBeat(ctx context.Context) {
if err := c.client.Set(ctx, c.heartBeatKey(), time.Now().UnixMilli(), 2*c.cfg.KeepAliveTimeout).Err(); err != nil {
l := log.Info
if ctx.Err() != nil {
l = log.Error
}
l("Updating heardbeat", "consumer", c.id, "error", err)
}
}

// Consumer first checks it there exists pending message that is claimed by
// unresponsive consumer, if not then reads from the stream.
func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Request], error) {
res, err := c.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.cfg.RedisGroup,
Consumer: c.id,
// Receive only messages that were never delivered to any other consumer,
// that is, only new messages.
Streams: []string{c.cfg.RedisStream, ">"},
Count: 1,
Block: time.Millisecond, // 0 seems to block the read instead of immediately returning
}).Result()
if errors.Is(err, redis.Nil) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("reading message for consumer: %q: %w", c.id, err)
}
if len(res) != 1 || len(res[0].Messages) != 1 {
return nil, fmt.Errorf("redis returned entries: %+v, for querying single message", res)
}
log.Debug(fmt.Sprintf("Consumer: %s consuming message: %s", c.id, res[0].Messages[0].ID))
var (
value = res[0].Messages[0].Values[messageKey]
data, ok = (value).(string)
)
if !ok {
return nil, fmt.Errorf("casting request to string: %w", err)
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("unmarshaling value: %v, error: %w", value, err)
}

return &Message[Request]{
ID: res[0].Messages[0].ID,
Value: req,
}, nil
}

func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID string, result Response) error {
resp, err := json.Marshal(result)
if err != nil {
return fmt.Errorf("marshaling result: %w", err)
}
acquired, err := c.client.SetNX(ctx, messageID, resp, c.cfg.ResponseEntryTimeout).Result()
if err != nil || !acquired {
return fmt.Errorf("setting result for message: %v, error: %w", messageID, err)
}
if _, err := c.client.XAck(ctx, c.cfg.RedisStream, c.cfg.RedisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
return nil
}
Loading

0 comments on commit 5a649cd

Please sign in to comment.