diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index e9cfe1dd3a..32b6175108 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -53,8 +53,18 @@ import ( ) var ( - batchPosterWalletBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/wallet/balanceether", nil) - batchPosterGasRefunderBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/gasrefunder/balanceether", nil) + batchPosterWalletBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/wallet/eth", nil) + batchPosterGasRefunderBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/gasrefunder/eth", nil) + baseFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/basefee", nil) + blobFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/blobfee", nil) + l1GasPriceGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice", nil) + l1GasPriceEstimateGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice/estimate", nil) + latestBatchSurplusGauge = metrics.NewRegisteredGauge("arb/batchposter/latestbatchsurplus", nil) + blockGasUsedGauge = metrics.NewRegisteredGauge("arb/batchposter/blockgas/used", nil) + blockGasLimitGauge = metrics.NewRegisteredGauge("arb/batchposter/blockgas/limit", nil) + blobGasUsedGauge = metrics.NewRegisteredGauge("arb/batchposter/blobgas/used", nil) + blobGasLimitGauge = metrics.NewRegisteredGauge("arb/batchposter/blobgas/limit", nil) + suggestedTipCapGauge = metrics.NewRegisteredGauge("arb/batchposter/suggestedtipcap", nil) usableBytesInBlob = big.NewInt(int64(len(kzg4844.Blob{}) * 31 / 32)) blobTxBlobGasPerBlob = big.NewInt(params.BlobTxBlobGasPerBlob) @@ -510,6 +520,49 @@ func (b *BatchPoster) checkReverts(ctx context.Context, to int64) (bool, error) return false, nil } +func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { + headerCh, unsubscribe := b.l1Reader.Subscribe(false) + defer unsubscribe() + + blobGasLimitGauge.Update(params.MaxBlobGasPerBlock) + for { + select { + case h, ok := <-headerCh: + if !ok { + log.Info("L1 headers channel checking for l1 price data has been closed") + return + } + baseFeeGauge.Update(h.BaseFee.Int64()) + l1GasPrice := h.BaseFee.Uint64() + if h.BlobGasUsed != nil { + if h.ExcessBlobGas != nil { + blobFeePerByte := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*h.ExcessBlobGas, *h.BlobGasUsed)) + blobFeePerByte.Mul(blobFeePerByte, blobTxBlobGasPerBlob) + blobFeePerByte.Div(blobFeePerByte, usableBytesInBlob) + blobFeeGauge.Update(blobFeePerByte.Int64()) + if l1GasPrice > blobFeePerByte.Uint64()/16 { + l1GasPrice = blobFeePerByte.Uint64() / 16 + } + } + blobGasUsedGauge.Update(int64(*h.BlobGasUsed)) + } + blockGasUsedGauge.Update(int64(h.GasUsed)) + blockGasLimitGauge.Update(int64(h.GasLimit)) + suggestedTipCap, err := b.l1Reader.Client().SuggestGasTipCap(ctx) + if err != nil { + log.Error("unable to fetch suggestedTipCap from l1 client to update arb/batchposter/suggestedtipcap metric", "err", err) + } else { + suggestedTipCapGauge.Update(suggestedTipCap.Int64()) + } + l1GasPriceEstimate := b.streamer.CurrentEstimateOfL1GasPrice() + l1GasPriceGauge.Update(int64(l1GasPrice)) + l1GasPriceEstimateGauge.Update(int64(l1GasPriceEstimate)) + case <-ctx.Done(): + return + } + } +} + // pollForReverts runs a gouroutine that listens to l1 block headers, checks // if any transaction made by batch poster was reverted. func (b *BatchPoster) pollForReverts(ctx context.Context) { @@ -1272,6 +1325,14 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) "numBlobs", len(kzgBlobs), ) + surplus := arbmath.SaturatingMul( + arbmath.SaturatingSub( + l1GasPriceGauge.Snapshot().Value(), + l1GasPriceEstimateGauge.Snapshot().Value()), + int64(len(sequencerMsg)*16), + ) + latestBatchSurplusGauge.Update(surplus) + recentlyHitL1Bounds := time.Since(b.lastHitL1Bounds) < config.PollInterval*3 postedMessages := b.building.msgCount - batchPosition.MessageCount b.messagesPerBatch.Update(uint64(postedMessages)) @@ -1341,6 +1402,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) { b.redisLock.Start(ctxIn) b.StopWaiter.Start(ctxIn, b) b.LaunchThread(b.pollForReverts) + b.LaunchThread(b.pollForL1PriceData) commonEphemeralErrorHandler := util.NewEphemeralErrorHandler(time.Minute, "", 0) exceedMaxMempoolSizeEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, dataposter.ErrExceedsMaxMempoolSize.Error(), time.Minute) storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute) diff --git a/arbnode/node.go b/arbnode/node.go index 29c1071b7a..7a7a99ba88 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -756,6 +756,17 @@ func CreateNode( return currentNode, nil } +func (n *Node) CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) { + n.TxStreamer.CacheL1PriceDataOfMsg(pos, callDataUnits, l1GasCharged) +} + +func (n *Node) BacklogL1GasCharged() uint64 { + return n.TxStreamer.BacklogL1GasCharged() +} +func (n *Node) BacklogCallDataUnits() uint64 { + return n.TxStreamer.BacklogCallDataUnits() +} + func (n *Node) Start(ctx context.Context) error { execClient, ok := n.Execution.(*gethexec.ExecutionNode) if !ok { diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index fa161db6c5..7d24005bcd 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -68,6 +68,9 @@ type TransactionStreamer struct { broadcastServer *broadcaster.Broadcaster inboxReader *InboxReader delayedBridge *DelayedBridge + + cachedL1PriceDataMutex sync.RWMutex + cachedL1PriceData *L1PriceData } type TransactionStreamerConfig struct { @@ -112,6 +115,9 @@ func NewTransactionStreamer( broadcastServer: broadcastServer, fatalErrChan: fatalErrChan, config: config, + cachedL1PriceData: &L1PriceData{ + msgToL1PriceData: []L1PriceDataOfMsg{}, + }, } err := streamer.cleanupInconsistentState() if err != nil { @@ -120,6 +126,120 @@ func NewTransactionStreamer( return streamer, nil } +type L1PriceDataOfMsg struct { + callDataUnits uint64 + cummulativeCallDataUnits uint64 + l1GasCharged uint64 + cummulativeL1GasCharged uint64 +} + +type L1PriceData struct { + startOfL1PriceDataCache arbutil.MessageIndex + endOfL1PriceDataCache arbutil.MessageIndex + msgToL1PriceData []L1PriceDataOfMsg + currentEstimateOfL1GasPrice uint64 +} + +func (s *TransactionStreamer) CurrentEstimateOfL1GasPrice() uint64 { + s.cachedL1PriceDataMutex.Lock() + defer s.cachedL1PriceDataMutex.Unlock() + + currentEstimate, err := s.exec.GetL1GasPriceEstimate() + if err != nil { + log.Error("error fetching current L2 estimate of L1 gas price hence reusing cached estimate", "err", err) + } else { + s.cachedL1PriceData.currentEstimateOfL1GasPrice = currentEstimate + } + return s.cachedL1PriceData.currentEstimateOfL1GasPrice +} + +func (s *TransactionStreamer) BacklogCallDataUnits() uint64 { + s.cachedL1PriceDataMutex.RLock() + defer s.cachedL1PriceDataMutex.RUnlock() + + size := len(s.cachedL1PriceData.msgToL1PriceData) + if size == 0 { + return 0 + } + return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits - + s.cachedL1PriceData.msgToL1PriceData[0].cummulativeCallDataUnits + + s.cachedL1PriceData.msgToL1PriceData[0].callDataUnits) +} + +func (s *TransactionStreamer) BacklogL1GasCharged() uint64 { + s.cachedL1PriceDataMutex.RLock() + defer s.cachedL1PriceDataMutex.RUnlock() + + size := len(s.cachedL1PriceData.msgToL1PriceData) + if size == 0 { + return 0 + } + return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged - + s.cachedL1PriceData.msgToL1PriceData[0].cummulativeL1GasCharged + + s.cachedL1PriceData.msgToL1PriceData[0].l1GasCharged) +} + +func (s *TransactionStreamer) TrimCache(to arbutil.MessageIndex) { + s.cachedL1PriceDataMutex.Lock() + defer s.cachedL1PriceDataMutex.Unlock() + + if to < s.cachedL1PriceData.startOfL1PriceDataCache { + log.Info("trying to trim older cache which doesnt exist anymore") + } else if to >= s.cachedL1PriceData.endOfL1PriceDataCache { + s.cachedL1PriceData.startOfL1PriceDataCache = 0 + s.cachedL1PriceData.endOfL1PriceDataCache = 0 + s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{} + } else { + newStart := to - s.cachedL1PriceData.startOfL1PriceDataCache + 1 + s.cachedL1PriceData.msgToL1PriceData = s.cachedL1PriceData.msgToL1PriceData[newStart:] + s.cachedL1PriceData.startOfL1PriceDataCache = to + 1 + } +} + +func (s *TransactionStreamer) CacheL1PriceDataOfMsg(seqNum arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) { + s.cachedL1PriceDataMutex.Lock() + defer s.cachedL1PriceDataMutex.Unlock() + + resetCache := func() { + s.cachedL1PriceData.startOfL1PriceDataCache = seqNum + s.cachedL1PriceData.endOfL1PriceDataCache = seqNum + s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{{ + callDataUnits: callDataUnits, + cummulativeCallDataUnits: callDataUnits, + l1GasCharged: l1GasCharged, + cummulativeL1GasCharged: l1GasCharged, + }} + } + size := len(s.cachedL1PriceData.msgToL1PriceData) + if size == 0 || + s.cachedL1PriceData.startOfL1PriceDataCache == 0 || + s.cachedL1PriceData.endOfL1PriceDataCache == 0 || + arbutil.MessageIndex(size) != s.cachedL1PriceData.endOfL1PriceDataCache-s.cachedL1PriceData.startOfL1PriceDataCache+1 { + resetCache() + return + } + if seqNum != s.cachedL1PriceData.endOfL1PriceDataCache+1 { + if seqNum > s.cachedL1PriceData.endOfL1PriceDataCache+1 { + log.Info("message position higher then current end of l1 price data cache, resetting cache to this message") + resetCache() + } else if seqNum < s.cachedL1PriceData.startOfL1PriceDataCache { + log.Info("message position lower than start of l1 price data cache, ignoring") + } else { + log.Info("message position already seen in l1 price data cache, ignoring") + } + } else { + cummulativeCallDataUnits := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits + cummulativeL1GasCharged := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged + s.cachedL1PriceData.msgToL1PriceData = append(s.cachedL1PriceData.msgToL1PriceData, L1PriceDataOfMsg{ + callDataUnits: callDataUnits, + cummulativeCallDataUnits: cummulativeCallDataUnits + callDataUnits, + l1GasCharged: l1GasCharged, + cummulativeL1GasCharged: cummulativeL1GasCharged + l1GasCharged, + }) + s.cachedL1PriceData.endOfL1PriceDataCache = seqNum + } +} + // Encodes a uint64 as bytes in a lexically sortable manner for database iteration. // Generally this is only used for database keys, which need sorted. // A shorter RLP encoding is usually used for database values. @@ -577,6 +697,8 @@ func endBatch(batch ethdb.Batch) error { func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error { if messagesAreConfirmed { + // Trim confirmed messages from l1pricedataCache + s.TrimCache(pos + arbutil.MessageIndex(len(messages))) s.reorgMutex.RLock() dups, _, _, err := s.countDuplicateMessages(pos, messages, nil) s.reorgMutex.RUnlock() diff --git a/arbos/l1pricing/l1pricing.go b/arbos/l1pricing/l1pricing.go index f2312c46d4..9e00eeb581 100644 --- a/arbos/l1pricing/l1pricing.go +++ b/arbos/l1pricing/l1pricing.go @@ -195,6 +195,23 @@ func (ps *L1PricingState) SetUnitsSinceUpdate(units uint64) error { return ps.unitsSinceUpdate.Set(units) } +func (ps *L1PricingState) GetL1PricingSurplus() (*big.Int, error) { + fundsDueForRefunds, err := ps.BatchPosterTable().TotalFundsDue() + if err != nil { + return nil, err + } + fundsDueForRewards, err := ps.FundsDueForRewards() + if err != nil { + return nil, err + } + haveFunds, err := ps.L1FeesAvailable() + if err != nil { + return nil, err + } + needFunds := arbmath.BigAdd(fundsDueForRefunds, fundsDueForRewards) + return arbmath.BigSub(haveFunds, needFunds), nil +} + func (ps *L1PricingState) LastSurplus() (*big.Int, error) { return ps.lastSurplus.Get() } diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 526bf82bf5..f9e98b3a6a 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -378,6 +378,8 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes. return nil, err } + s.cacheL1PriceDataOfMsg(pos, receipts, block) + return block, nil } @@ -529,6 +531,55 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos))) } +func (s *ExecutionEngine) GetL1GasPriceEstimate() (uint64, error) { + bc := s.bc + latestHeader := bc.CurrentBlock() + latestState, err := bc.StateAt(latestHeader.Root) + if err != nil { + return 0, errors.New("error getting latest statedb while fetching l2 Estimate of L1 GasPrice") + } + arbState, err := arbosState.OpenSystemArbosState(latestState, nil, true) + if err != nil { + return 0, errors.New("error opening system arbos state while fetching l2 Estimate of L1 GasPrice") + } + l2EstimateL1GasPrice, err := arbState.L1PricingState().PricePerUnit() + if err != nil { + return 0, errors.New("error fetching l2 Estimate of L1 GasPrice") + } + return l2EstimateL1GasPrice.Uint64(), nil +} + +func (s *ExecutionEngine) getL1PricingSurplus() (int64, error) { + bc := s.bc + latestHeader := bc.CurrentBlock() + latestState, err := bc.StateAt(latestHeader.Root) + if err != nil { + return 0, errors.New("error getting latest statedb while fetching current L1 pricing surplus") + } + arbState, err := arbosState.OpenSystemArbosState(latestState, nil, true) + if err != nil { + return 0, errors.New("error opening system arbos state while fetching current L1 pricing surplus") + } + surplus, err := arbState.L1PricingState().GetL1PricingSurplus() + if err != nil { + return 0, errors.New("error fetching current L1 pricing surplus") + } + return surplus.Int64(), nil +} + +func (s *ExecutionEngine) cacheL1PriceDataOfMsg(num arbutil.MessageIndex, receipts types.Receipts, block *types.Block) { + var gasUsedForL1 uint64 + for i := 1; i < len(receipts); i++ { + gasUsedForL1 += receipts[i].GasUsedForL1 + } + gasChargedForL1 := gasUsedForL1 * block.BaseFee().Uint64() + var callDataUnits uint64 + for _, tx := range block.Transactions() { + callDataUnits += tx.CalldataUnits + } + s.consensus.CacheL1PriceDataOfMsg(num, callDataUnits, gasChargedForL1) +} + // DigestMessage is used to create a block by executing msg against the latest state and storing it. // Also, while creating a block by executing msg against the latest state, // in parallel, creates a block by executing msgForPrefetch (msg+1) against the latest state diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 2053f86aa3..88c1410031 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -275,6 +275,10 @@ func CreateExecutionNode( } +func (n *ExecutionNode) GetL1GasPriceEstimate() (uint64, error) { + return n.ExecEngine.GetL1GasPriceEstimate() +} + func (n *ExecutionNode) Initialize(ctx context.Context) error { n.ArbInterface.Initialize(n) err := n.Backend.Start() diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 5befe3c374..63461cd33f 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -10,6 +10,7 @@ import ( "math" "math/big" "runtime/debug" + "strconv" "strings" "sync" "sync/atomic" @@ -25,10 +26,12 @@ import ( "github.com/ethereum/go-ethereum/arbitrum" "github.com/ethereum/go-ethereum/arbitrum_types" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" @@ -51,21 +54,30 @@ var ( successfulBlocksCounter = metrics.NewRegisteredCounter("arb/sequencer/block/successful", nil) conditionalTxRejectedBySequencerCounter = metrics.NewRegisteredCounter("arb/sequencer/condtionaltx/rejected", nil) conditionalTxAcceptedBySequencerCounter = metrics.NewRegisteredCounter("arb/sequencer/condtionaltx/accepted", nil) + l1GasPriceGauge = metrics.NewRegisteredGauge("arb/sequencer/l1gasprice", nil) + callDataUnitsBacklogGauge = metrics.NewRegisteredGauge("arb/sequencer/calldataunitsbacklog", nil) + unusedL1GasChargeGauge = metrics.NewRegisteredGauge("arb/sequencer/unusedl1gascharge", nil) + currentSurplusGauge = metrics.NewRegisteredGauge("arb/sequencer/currentsurplus", nil) + expectedSurplusGauge = metrics.NewRegisteredGauge("arb/sequencer/expectedsurplus", nil) ) type SequencerConfig struct { - Enable bool `koanf:"enable"` - MaxBlockSpeed time.Duration `koanf:"max-block-speed" reload:"hot"` - MaxRevertGasReject uint64 `koanf:"max-revert-gas-reject" reload:"hot"` - MaxAcceptableTimestampDelta time.Duration `koanf:"max-acceptable-timestamp-delta" reload:"hot"` - SenderWhitelist string `koanf:"sender-whitelist"` - Forwarder ForwarderConfig `koanf:"forwarder"` - QueueSize int `koanf:"queue-size"` - QueueTimeout time.Duration `koanf:"queue-timeout" reload:"hot"` - NonceCacheSize int `koanf:"nonce-cache-size" reload:"hot"` - MaxTxDataSize int `koanf:"max-tx-data-size" reload:"hot"` - NonceFailureCacheSize int `koanf:"nonce-failure-cache-size" reload:"hot"` - NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"` + Enable bool `koanf:"enable"` + MaxBlockSpeed time.Duration `koanf:"max-block-speed" reload:"hot"` + MaxRevertGasReject uint64 `koanf:"max-revert-gas-reject" reload:"hot"` + MaxAcceptableTimestampDelta time.Duration `koanf:"max-acceptable-timestamp-delta" reload:"hot"` + SenderWhitelist string `koanf:"sender-whitelist"` + Forwarder ForwarderConfig `koanf:"forwarder"` + QueueSize int `koanf:"queue-size"` + QueueTimeout time.Duration `koanf:"queue-timeout" reload:"hot"` + NonceCacheSize int `koanf:"nonce-cache-size" reload:"hot"` + MaxTxDataSize int `koanf:"max-tx-data-size" reload:"hot"` + NonceFailureCacheSize int `koanf:"nonce-failure-cache-size" reload:"hot"` + NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"` + ExpectedSurplusSoftThreshold string `koanf:"expected-surplus-soft-threshold" reload:"hot"` + ExpectedSurplusHardThreshold string `koanf:"expected-surplus-hard-threshold" reload:"hot"` + expectedSurplusSoftThreshold int + expectedSurplusHardThreshold int } func (c *SequencerConfig) Validate() error { @@ -78,6 +90,20 @@ func (c *SequencerConfig) Validate() error { return fmt.Errorf("sequencer sender whitelist entry \"%v\" is not a valid address", address) } } + var err error + if c.ExpectedSurplusSoftThreshold != "default" { + if c.expectedSurplusSoftThreshold, err = strconv.Atoi(c.ExpectedSurplusSoftThreshold); err != nil { + return fmt.Errorf("invalid expected-surplus-soft-threshold value provided in batchposter config %w", err) + } + } + if c.ExpectedSurplusHardThreshold != "default" { + if c.expectedSurplusHardThreshold, err = strconv.Atoi(c.ExpectedSurplusHardThreshold); err != nil { + return fmt.Errorf("invalid expected-surplus-hard-threshold value provided in batchposter config %w", err) + } + } + if c.expectedSurplusSoftThreshold < c.expectedSurplusHardThreshold { + return errors.New("expected-surplus-soft-threshold cannot be lower than expected-surplus-hard-threshold") + } return nil } @@ -94,24 +120,28 @@ var DefaultSequencerConfig = SequencerConfig{ NonceCacheSize: 1024, // 95% of the default batch poster limit, leaving 5KB for headers and such // This default is overridden for L3 chains in applyChainParameters in cmd/nitro/nitro.go - MaxTxDataSize: 95000, - NonceFailureCacheSize: 1024, - NonceFailureCacheExpiry: time.Second, + MaxTxDataSize: 95000, + NonceFailureCacheSize: 1024, + NonceFailureCacheExpiry: time.Second, + ExpectedSurplusSoftThreshold: "default", + ExpectedSurplusHardThreshold: "default", } var TestSequencerConfig = SequencerConfig{ - Enable: true, - MaxBlockSpeed: time.Millisecond * 10, - MaxRevertGasReject: params.TxGas + 10000, - MaxAcceptableTimestampDelta: time.Hour, - SenderWhitelist: "", - Forwarder: DefaultTestForwarderConfig, - QueueSize: 128, - QueueTimeout: time.Second * 5, - NonceCacheSize: 4, - MaxTxDataSize: 95000, - NonceFailureCacheSize: 1024, - NonceFailureCacheExpiry: time.Second, + Enable: true, + MaxBlockSpeed: time.Millisecond * 10, + MaxRevertGasReject: params.TxGas + 10000, + MaxAcceptableTimestampDelta: time.Hour, + SenderWhitelist: "", + Forwarder: DefaultTestForwarderConfig, + QueueSize: 128, + QueueTimeout: time.Second * 5, + NonceCacheSize: 4, + MaxTxDataSize: 95000, + NonceFailureCacheSize: 1024, + NonceFailureCacheExpiry: time.Second, + ExpectedSurplusSoftThreshold: "default", + ExpectedSurplusHardThreshold: "default", } func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -127,6 +157,8 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-tx-data-size", DefaultSequencerConfig.MaxTxDataSize, "maximum transaction size the sequencer will accept") f.Int(prefix+".nonce-failure-cache-size", DefaultSequencerConfig.NonceFailureCacheSize, "number of transactions with too high of a nonce to keep in memory while waiting for their predecessor") f.Duration(prefix+".nonce-failure-cache-expiry", DefaultSequencerConfig.NonceFailureCacheExpiry, "maximum amount of time to wait for a predecessor before rejecting a tx with nonce too high") + f.String(prefix+".expected-surplus-soft-threshold", DefaultSequencerConfig.ExpectedSurplusSoftThreshold, "if expected surplus is lower than this value, warnings are posted") + f.String(prefix+".expected-surplus-hard-threshold", DefaultSequencerConfig.ExpectedSurplusHardThreshold, "if expected surplus is lower than this value, new incoming transactions will be denied") } type txQueueItem struct { @@ -291,6 +323,10 @@ type Sequencer struct { activeMutex sync.Mutex pauseChan chan struct{} forwarder *TxForwarder + + expectedSurplusMutex sync.RWMutex + expectedSurplus int64 + expectedSurplusUpdated bool } func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderReader, configFetcher SequencerConfigFetcher) (*Sequencer, error) { @@ -364,6 +400,16 @@ func ctxWithTimeout(ctx context.Context, timeout time.Duration) (context.Context } func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error { + // Only try to acquire Rlock and check for hard threshold if l1reader is not nil + // And hard threshold was enabled, this prevents spamming of read locks when not needed + if s.l1Reader != nil && s.config().ExpectedSurplusHardThreshold != "default" { + s.expectedSurplusMutex.RLock() + if s.expectedSurplusUpdated && s.expectedSurplus < int64(s.config().expectedSurplusHardThreshold) { + return errors.New("currently not accepting transactions due to expected surplus being below threshold") + } + s.expectedSurplusMutex.RUnlock() + } + sequencerBacklogGauge.Inc(1) defer sequencerBacklogGauge.Dec(1) @@ -944,14 +990,83 @@ func (s *Sequencer) Initialize(ctx context.Context) error { return nil } +var ( + usableBytesInBlob = big.NewInt(int64(len(kzg4844.Blob{}) * 31 / 32)) + blobTxBlobGasPerBlob = big.NewInt(params.BlobTxBlobGasPerBlob) +) + +func (s *Sequencer) updateExpectedSurplus(ctx context.Context) (int64, error) { + header, err := s.l1Reader.LastHeader(ctx) + if err != nil { + return 0, fmt.Errorf("error encountered getting latest header from l1reader while updating expectedSurplus: %w", err) + } + l1GasPrice := header.BaseFee.Uint64() + if header.BlobGasUsed != nil { + if header.ExcessBlobGas != nil { + blobFeePerByte := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*header.ExcessBlobGas, *header.BlobGasUsed)) + blobFeePerByte.Mul(blobFeePerByte, blobTxBlobGasPerBlob) + blobFeePerByte.Div(blobFeePerByte, usableBytesInBlob) + if l1GasPrice > blobFeePerByte.Uint64()/16 { + l1GasPrice = blobFeePerByte.Uint64() / 16 + } + } + } + surplus, err := s.execEngine.getL1PricingSurplus() + if err != nil { + return 0, fmt.Errorf("error encountered getting l1 pricing surplus while updating expectedSurplus: %w", err) + } + backlogL1GasCharged := int64(s.execEngine.consensus.BacklogL1GasCharged()) + backlogCallDataUnits := int64(s.execEngine.consensus.BacklogCallDataUnits()) + expectedSurplus := int64(surplus) + backlogL1GasCharged - backlogCallDataUnits*int64(l1GasPrice) + // update metrics + l1GasPriceGauge.Update(int64(l1GasPrice)) + callDataUnitsBacklogGauge.Update(backlogCallDataUnits) + unusedL1GasChargeGauge.Update(backlogL1GasCharged) + currentSurplusGauge.Update(surplus) + expectedSurplusGauge.Update(expectedSurplus) + if s.config().ExpectedSurplusSoftThreshold != "default" && expectedSurplus < int64(s.config().expectedSurplusSoftThreshold) { + log.Warn("expected surplus is below soft threshold", "value", expectedSurplus, "threshold", s.config().expectedSurplusSoftThreshold) + } + return expectedSurplus, nil +} + func (s *Sequencer) Start(ctxIn context.Context) error { s.StopWaiter.Start(ctxIn, s) + + if (s.config().ExpectedSurplusHardThreshold != "default" || s.config().ExpectedSurplusSoftThreshold != "default") && s.l1Reader == nil { + return errors.New("expected surplus soft/hard thresholds are enabled but l1Reader is nil") + } + if s.l1Reader != nil { initialBlockNr := atomic.LoadUint64(&s.l1BlockNumber) if initialBlockNr == 0 { return errors.New("sequencer not initialized") } + expectedSurplus, err := s.updateExpectedSurplus(ctxIn) + if err != nil { + if s.config().ExpectedSurplusHardThreshold != "default" { + return fmt.Errorf("expected-surplus-hard-threshold is enabled but error fetching initial expected surplus value: %w", err) + } + log.Error("expected-surplus-soft-threshold is enabled but error fetching initial expected surplus value", "err", err) + } else { + s.expectedSurplus = expectedSurplus + s.expectedSurplusUpdated = true + } + s.CallIteratively(func(ctx context.Context) time.Duration { + expectedSurplus, err := s.updateExpectedSurplus(ctxIn) + s.expectedSurplusMutex.Lock() + defer s.expectedSurplusMutex.Unlock() + if err != nil { + s.expectedSurplusUpdated = false + log.Error("expected surplus soft/hard thresholds are enabled but unable to fetch latest expected surplus, retrying", "err", err) + return 0 + } + s.expectedSurplusUpdated = true + s.expectedSurplus = expectedSurplus + return 5 * time.Second + }) + headerChan, cancel := s.l1Reader.Subscribe(false) s.LaunchThread(func(ctx context.Context) { diff --git a/execution/interface.go b/execution/interface.go index b0817aeac4..7540a09210 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -54,6 +54,7 @@ type ExecutionSequencer interface { ForwardTo(url string) error SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error NextDelayedMessageNumber() (uint64, error) + GetL1GasPriceEstimate() (uint64, error) } type FullExecutionClient interface { @@ -91,6 +92,9 @@ type ConsensusInfo interface { type ConsensusSequencer interface { WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error ExpectChosenSequencer() error + CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) + BacklogL1GasCharged() uint64 + BacklogCallDataUnits() uint64 } type FullConsensusClient interface { diff --git a/precompiles/ArbGasInfo.go b/precompiles/ArbGasInfo.go index cda5350a4a..cb0045c49f 100644 --- a/precompiles/ArbGasInfo.go +++ b/precompiles/ArbGasInfo.go @@ -202,20 +202,7 @@ func (con ArbGasInfo) GetL1PricingSurplus(c ctx, evm mech) (*big.Int, error) { return con._preversion10_GetL1PricingSurplus(c, evm) } ps := c.State.L1PricingState() - fundsDueForRefunds, err := ps.BatchPosterTable().TotalFundsDue() - if err != nil { - return nil, err - } - fundsDueForRewards, err := ps.FundsDueForRewards() - if err != nil { - return nil, err - } - haveFunds, err := ps.L1FeesAvailable() - if err != nil { - return nil, err - } - needFunds := arbmath.BigAdd(fundsDueForRefunds, fundsDueForRewards) - return arbmath.BigSub(haveFunds, needFunds), nil + return ps.GetL1PricingSurplus() } func (con ArbGasInfo) _preversion10_GetL1PricingSurplus(c ctx, evm mech) (*big.Int, error) {