From 87f08dcd2a4bd198d4b27663f8d5b77c58e14811 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 6 Mar 2024 11:16:13 -0600 Subject: [PATCH 01/11] Add metrics to track L1 price in batch poster --- arbnode/batch_poster.go | 44 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 14d5affa08..93dbb6ea8c 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -52,8 +52,13 @@ import ( ) var ( - batchPosterWalletBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/wallet/balanceether", nil) - batchPosterGasRefunderBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/gasrefunder/balanceether", nil) + batchPosterWalletBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/wallet/balanceether", nil) + batchPosterGasRefunderBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/gasrefunder/balanceether", nil) + baseFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/basefee", nil) + blobFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/blobfee", nil) + blockGasUsedPerBlockGasLimitGauge = metrics.NewRegisteredGaugeFloat64("arb/batchposter/blockgasusedperblockgaslimit", nil) + blobGasUsedPerBlobGasLimitGauge = metrics.NewRegisteredGaugeFloat64("arb/batchposter/blobgasusedperblobgaslimit", nil) + suggestedTipCapGauge = metrics.NewRegisteredGauge("arb/batchposter/suggestedtipcap", nil) usableBytesInBlob = big.NewInt(int64(len(kzg4844.Blob{}) * 31 / 32)) blobTxBlobGasPerBlob = big.NewInt(params.BlobTxBlobGasPerBlob) @@ -467,6 +472,40 @@ func (b *BatchPoster) checkReverts(ctx context.Context, to int64) (bool, error) return false, nil } +func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { + headerCh, unsubscribe := b.l1Reader.Subscribe(false) + defer unsubscribe() + + for { + select { + case h, ok := <-headerCh: + if !ok { + log.Info("L1 headers channel checking for l1 price data has been closed") + return + } + baseFeeGauge.Update(h.BaseFee.Int64()) + if h.BlobGasUsed != nil { + if h.ExcessBlobGas != nil { + blobFee := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*h.ExcessBlobGas, *h.BlobGasUsed)) + blobFeeGauge.Update(blobFee.Int64()) + } + blobGasUsedPerBlobGasLimitGauge.Update(float64(*h.BlobGasUsed) / params.MaxBlobGasPerBlock) + } + blockGasUsedPerBlockGasLimitGauge.Update(float64(h.GasUsed) / float64(h.GasLimit)) + suggestedTipCap, err := b.l1Reader.Client().SuggestGasTipCap(ctx) + if err != nil { + log.Error("unable to fetch suggestedTipCap from l1 client to update arb/batchposter/suggestedtipcap metric", "err", err) + } else { + suggestedTipCapGauge.Update(suggestedTipCap.Int64()) + } + // We poll for new headers every five seconds to get accurate reporting of these metrics + time.Sleep(5 * time.Second) + case <-ctx.Done(): + return + } + } +} + // pollForReverts runs a gouroutine that listens to l1 block headers, checks // if any transaction made by batch poster was reverted. func (b *BatchPoster) pollForReverts(ctx context.Context) { @@ -1289,6 +1328,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) { b.redisLock.Start(ctxIn) b.StopWaiter.Start(ctxIn, b) b.LaunchThread(b.pollForReverts) + b.LaunchThread(b.pollForL1PriceData) commonEphemeralErrorHandler := util.NewEphemeralErrorHandler(time.Minute, "", 0) exceedMaxMempoolSizeEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, dataposter.ErrExceedsMaxMempoolSize.Error(), time.Minute) storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute) From a7a7e1270870faeab300ca0632964cdd90ad506e Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 11 Mar 2024 17:08:15 -0500 Subject: [PATCH 02/11] Track pricing data in sequencer and batch poster. Add option to decline incoming transactions during extreme l1 pricing conditions --- arbnode/batch_poster.go | 28 ++++- arbnode/transaction_streamer.go | 106 +++++++++++++++++ arbos/l1pricing/l1pricing.go | 17 +++ execution/gethexec/executionengine.go | 51 ++++++++ execution/gethexec/node.go | 4 + execution/gethexec/sequencer.go | 160 +++++++++++++++++++++----- execution/interface.go | 4 + precompiles/ArbGasInfo.go | 15 +-- 8 files changed, 342 insertions(+), 43 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 93dbb6ea8c..f026ecc03f 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -56,6 +56,9 @@ var ( batchPosterGasRefunderBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/gasrefunder/balanceether", nil) baseFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/basefee", nil) blobFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/blobfee", nil) + l1GasPriceGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice", nil) + l1GasPriceEstimateGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gaspriceestimate", nil) + latestBatchSurplusGauge = metrics.NewRegisteredGauge("arb/batchposter/latestbatchsurplus", nil) blockGasUsedPerBlockGasLimitGauge = metrics.NewRegisteredGaugeFloat64("arb/batchposter/blockgasusedperblockgaslimit", nil) blobGasUsedPerBlobGasLimitGauge = metrics.NewRegisteredGaugeFloat64("arb/batchposter/blobgasusedperblobgaslimit", nil) suggestedTipCapGauge = metrics.NewRegisteredGauge("arb/batchposter/suggestedtipcap", nil) @@ -107,6 +110,13 @@ type BatchPoster struct { nextRevertCheckBlock int64 // the last parent block scanned for reverting batches accessList func(SequencerInboxAccs, AfterDelayedMessagesRead int) types.AccessList + + pricingMetrics l1PricingMetrics +} + +type l1PricingMetrics struct { + l1GasPrice uint64 + l1GasPriceEstimate uint64 } type l1BlockBound int @@ -484,10 +494,16 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { return } baseFeeGauge.Update(h.BaseFee.Int64()) + l1GasPrice := h.BaseFee.Uint64() if h.BlobGasUsed != nil { if h.ExcessBlobGas != nil { - blobFee := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*h.ExcessBlobGas, *h.BlobGasUsed)) - blobFeeGauge.Update(blobFee.Int64()) + blobFeePerByte := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*h.ExcessBlobGas, *h.BlobGasUsed)) + blobFeePerByte.Mul(blobFeePerByte, blobTxBlobGasPerBlob) + blobFeePerByte.Div(blobFeePerByte, usableBytesInBlob) + blobFeeGauge.Update(blobFeePerByte.Int64()) + if l1GasPrice > blobFeePerByte.Uint64()/16 { + l1GasPrice = blobFeePerByte.Uint64() / 16 + } } blobGasUsedPerBlobGasLimitGauge.Update(float64(*h.BlobGasUsed) / params.MaxBlobGasPerBlock) } @@ -498,6 +514,11 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { } else { suggestedTipCapGauge.Update(suggestedTipCap.Int64()) } + l1GasPriceEstimate := b.streamer.CurrentEstimateOfL1GasPrice() + b.pricingMetrics.l1GasPrice = l1GasPrice + b.pricingMetrics.l1GasPriceEstimate = l1GasPriceEstimate + l1GasPriceGauge.Update(int64(l1GasPrice)) + l1GasPriceEstimateGauge.Update(int64(l1GasPriceEstimate)) // We poll for new headers every five seconds to get accurate reporting of these metrics time.Sleep(5 * time.Second) case <-ctx.Done(): @@ -1259,6 +1280,9 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) "numBlobs", len(kzgBlobs), ) + surplus := arbmath.SaturatingSub(int64(b.pricingMetrics.l1GasPrice), int64(b.pricingMetrics.l1GasPriceEstimate)) * int64(len(sequencerMsg)*16) + latestBatchSurplusGauge.Update(surplus) + recentlyHitL1Bounds := time.Since(b.lastHitL1Bounds) < config.PollInterval*3 postedMessages := b.building.msgCount - batchPosition.MessageCount b.messagesPerBatch.Update(uint64(postedMessages)) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 7e9cf1dbad..7145184dc6 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -68,6 +68,8 @@ type TransactionStreamer struct { broadcastServer *broadcaster.Broadcaster inboxReader *InboxReader delayedBridge *DelayedBridge + + cachedL1PriceData *L1PriceData } type TransactionStreamerConfig struct { @@ -112,6 +114,9 @@ func NewTransactionStreamer( broadcastServer: broadcastServer, fatalErrChan: fatalErrChan, config: config, + cachedL1PriceData: &L1PriceData{ + msgToL1PriceData: []L1PriceDataOfMsg{}, + }, } streamer.exec.SetTransactionStreamer(streamer) err := streamer.cleanupInconsistentState() @@ -121,6 +126,105 @@ func NewTransactionStreamer( return streamer, nil } +type L1PriceDataOfMsg struct { + callDataUnits uint64 + cummulativeCallDataUnits uint64 + l1GasCharged uint64 + cummulativeL1GasCharged uint64 +} + +type L1PriceData struct { + startOfL1PriceDataCache arbutil.MessageIndex + endOfL1PriceDataCache arbutil.MessageIndex + msgToL1PriceData []L1PriceDataOfMsg + currentEstimateOfL1GasPrice uint64 +} + +func (s *TransactionStreamer) CurrentEstimateOfL1GasPrice() uint64 { + currentEstimate, err := s.exec.GetL1GasPriceEstimate() + if err != nil { + log.Error("error fetching current L2 estimate of L1 gas price hence reusing cached estimate", "err", err) + } else { + s.cachedL1PriceData.currentEstimateOfL1GasPrice = currentEstimate + } + return s.cachedL1PriceData.currentEstimateOfL1GasPrice +} + +func (s *TransactionStreamer) BacklogCallDataUnits() uint64 { + size := len(s.cachedL1PriceData.msgToL1PriceData) + if size == 0 { + return 0 + } + return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits - + s.cachedL1PriceData.msgToL1PriceData[0].cummulativeCallDataUnits + + s.cachedL1PriceData.msgToL1PriceData[0].callDataUnits) +} + +func (s *TransactionStreamer) BacklogL1GasCharged() uint64 { + size := len(s.cachedL1PriceData.msgToL1PriceData) + if size == 0 { + return 0 + } + return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged - + s.cachedL1PriceData.msgToL1PriceData[0].cummulativeL1GasCharged + + s.cachedL1PriceData.msgToL1PriceData[0].l1GasCharged) +} + +func (s *TransactionStreamer) TrimCache(to arbutil.MessageIndex) { + if to < s.cachedL1PriceData.startOfL1PriceDataCache { + log.Info("trying to trim older cache which doesnt exist anymore") + } else if to >= s.cachedL1PriceData.endOfL1PriceDataCache { + s.cachedL1PriceData.startOfL1PriceDataCache = 0 + s.cachedL1PriceData.endOfL1PriceDataCache = 0 + s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{} + } else { + newStart := to - s.cachedL1PriceData.startOfL1PriceDataCache + 1 + s.cachedL1PriceData.msgToL1PriceData = s.cachedL1PriceData.msgToL1PriceData[newStart:] + s.cachedL1PriceData.startOfL1PriceDataCache = to + 1 + } +} + +func (s *TransactionStreamer) CacheL1PriceDataOfMsg(seqNum arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) { + resetCache := func() { + s.cachedL1PriceData.startOfL1PriceDataCache = seqNum + s.cachedL1PriceData.endOfL1PriceDataCache = seqNum + s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{{ + callDataUnits: callDataUnits, + cummulativeCallDataUnits: callDataUnits, + l1GasCharged: l1GasCharged, + cummulativeL1GasCharged: l1GasCharged, + }} + } + size := len(s.cachedL1PriceData.msgToL1PriceData) + if size == 0 || + s.cachedL1PriceData.startOfL1PriceDataCache == 0 || + s.cachedL1PriceData.endOfL1PriceDataCache == 0 || + arbutil.MessageIndex(size) != s.cachedL1PriceData.endOfL1PriceDataCache-s.cachedL1PriceData.startOfL1PriceDataCache+1 { + resetCache() + return + } + if seqNum != s.cachedL1PriceData.endOfL1PriceDataCache+1 { + if seqNum > s.cachedL1PriceData.endOfL1PriceDataCache+1 { + log.Info("message position higher then current end of l1 price data cache, resetting cache to this message") + resetCache() + } else if seqNum < s.cachedL1PriceData.startOfL1PriceDataCache { + log.Info("message position lower than start of l1 price data cache, ignoring") + } else { + log.Info("message position already seen in l1 price data cache, ignoring") + } + } else { + cummulativeCallDataUnits := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits + cummulativeL1GasCharged := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged + s.cachedL1PriceData.msgToL1PriceData = append(s.cachedL1PriceData.msgToL1PriceData, L1PriceDataOfMsg{ + callDataUnits: callDataUnits, + cummulativeCallDataUnits: cummulativeCallDataUnits + callDataUnits, + l1GasCharged: l1GasCharged, + cummulativeL1GasCharged: cummulativeL1GasCharged + l1GasCharged, + }) + s.cachedL1PriceData.endOfL1PriceDataCache = seqNum + } +} + // Encodes a uint64 as bytes in a lexically sortable manner for database iteration. // Generally this is only used for database keys, which need sorted. // A shorter RLP encoding is usually used for database values. @@ -563,6 +667,8 @@ func endBatch(batch ethdb.Batch) error { func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error { if messagesAreConfirmed { + // Trim confirmed messages from l1pricedataCache + s.TrimCache(pos + arbutil.MessageIndex(len(messages))) s.reorgMutex.RLock() dups, _, _, err := s.countDuplicateMessages(pos, messages, nil) s.reorgMutex.RUnlock() diff --git a/arbos/l1pricing/l1pricing.go b/arbos/l1pricing/l1pricing.go index f2312c46d4..9e00eeb581 100644 --- a/arbos/l1pricing/l1pricing.go +++ b/arbos/l1pricing/l1pricing.go @@ -195,6 +195,23 @@ func (ps *L1PricingState) SetUnitsSinceUpdate(units uint64) error { return ps.unitsSinceUpdate.Set(units) } +func (ps *L1PricingState) GetL1PricingSurplus() (*big.Int, error) { + fundsDueForRefunds, err := ps.BatchPosterTable().TotalFundsDue() + if err != nil { + return nil, err + } + fundsDueForRewards, err := ps.FundsDueForRewards() + if err != nil { + return nil, err + } + haveFunds, err := ps.L1FeesAvailable() + if err != nil { + return nil, err + } + needFunds := arbmath.BigAdd(fundsDueForRefunds, fundsDueForRewards) + return arbmath.BigSub(haveFunds, needFunds), nil +} + func (ps *L1PricingState) LastSurplus() (*big.Int, error) { return ps.lastSurplus.Get() } diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index a662de3621..2cbb486fe8 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -365,6 +365,8 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes. return nil, err } + s.cacheL1PriceDataOfMsg(pos, receipts, block) + return block, nil } @@ -505,6 +507,55 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos))) } +func (s *ExecutionEngine) GetL1GasPriceEstimate() (uint64, error) { + bc := s.bc + latestHeader := bc.CurrentBlock() + latestState, err := bc.StateAt(latestHeader.Root) + if err != nil { + return 0, errors.New("error getting latest statedb while fetching l2 Estimate of L1 GasPrice") + } + arbState, err := arbosState.OpenSystemArbosState(latestState, nil, true) + if err != nil { + return 0, errors.New("error opening system arbos state while fetching l2 Estimate of L1 GasPrice") + } + l2EstimateL1GasPrice, err := arbState.L1PricingState().PricePerUnit() + if err != nil { + return 0, errors.New("error fetching l2 Estimate of L1 GasPrice") + } + return l2EstimateL1GasPrice.Uint64(), nil +} + +func (s *ExecutionEngine) getL1PricingSurplus() (int64, error) { + bc := s.bc + latestHeader := bc.CurrentBlock() + latestState, err := bc.StateAt(latestHeader.Root) + if err != nil { + return 0, errors.New("error getting latest statedb while fetching l2 Estimate of L1 GasPrice") + } + arbState, err := arbosState.OpenSystemArbosState(latestState, nil, true) + if err != nil { + return 0, errors.New("error opening system arbos state while fetching l2 Estimate of L1 GasPrice") + } + surplus, err := arbState.L1PricingState().GetL1PricingSurplus() + if err != nil { + return 0, errors.New("error fetching l2 Estimate of L1 GasPrice") + } + return surplus.Int64(), nil +} + +func (s *ExecutionEngine) cacheL1PriceDataOfMsg(num arbutil.MessageIndex, receipts types.Receipts, block *types.Block) { + var gasUsedForL1 uint64 + for i := 1; i < len(receipts); i++ { + gasUsedForL1 += receipts[i].GasUsedForL1 + } + gasChargedForL1 := gasUsedForL1 * block.BaseFee().Uint64() + var callDataUnits uint64 + for _, tx := range block.Transactions() { + callDataUnits += tx.CalldataUnits + } + s.streamer.CacheL1PriceDataOfMsg(num, callDataUnits, gasChargedForL1) +} + // DigestMessage is used to create a block by executing msg against the latest state and storing it. // Also, while creating a block by executing msg against the latest state, // in parallel, creates a block by executing msgForPrefetch (msg+1) against the latest state diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 80c2939af6..ca4fb19c6d 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -253,6 +253,10 @@ func CreateExecutionNode( } +func (n *ExecutionNode) GetL1GasPriceEstimate() (uint64, error) { + return n.ExecEngine.GetL1GasPriceEstimate() +} + func (n *ExecutionNode) Initialize(ctx context.Context, arbnode interface{}, sync arbitrum.SyncProgressBackend) error { n.ArbInterface.Initialize(arbnode) err := n.Backend.Start() diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 5db38cbb4d..ced1aa9e31 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -10,6 +10,7 @@ import ( "math" "math/big" "runtime/debug" + "strconv" "strings" "sync" "sync/atomic" @@ -25,10 +26,12 @@ import ( "github.com/ethereum/go-ethereum/arbitrum" "github.com/ethereum/go-ethereum/arbitrum_types" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" @@ -51,21 +54,30 @@ var ( successfulBlocksCounter = metrics.NewRegisteredCounter("arb/sequencer/block/successful", nil) conditionalTxRejectedBySequencerCounter = metrics.NewRegisteredCounter("arb/sequencer/condtionaltx/rejected", nil) conditionalTxAcceptedBySequencerCounter = metrics.NewRegisteredCounter("arb/sequencer/condtionaltx/accepted", nil) + l1GasPriceGauge = metrics.NewRegisteredGauge("arb/sequencer/l1gasprice", nil) + callDataUnitsBacklogGauge = metrics.NewRegisteredGauge("arb/sequencer/calldataunitsbacklog", nil) + unusedL1GasChargeGauge = metrics.NewRegisteredGauge("arb/sequencer/unusedl1gascharge", nil) + currentSurplusGauge = metrics.NewRegisteredGauge("arb/sequencer/currentsurplus", nil) + expectedSurplusGauge = metrics.NewRegisteredGauge("arb/sequencer/expectedsurplus", nil) ) type SequencerConfig struct { - Enable bool `koanf:"enable"` - MaxBlockSpeed time.Duration `koanf:"max-block-speed" reload:"hot"` - MaxRevertGasReject uint64 `koanf:"max-revert-gas-reject" reload:"hot"` - MaxAcceptableTimestampDelta time.Duration `koanf:"max-acceptable-timestamp-delta" reload:"hot"` - SenderWhitelist string `koanf:"sender-whitelist"` - Forwarder ForwarderConfig `koanf:"forwarder"` - QueueSize int `koanf:"queue-size"` - QueueTimeout time.Duration `koanf:"queue-timeout" reload:"hot"` - NonceCacheSize int `koanf:"nonce-cache-size" reload:"hot"` - MaxTxDataSize int `koanf:"max-tx-data-size" reload:"hot"` - NonceFailureCacheSize int `koanf:"nonce-failure-cache-size" reload:"hot"` - NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"` + Enable bool `koanf:"enable"` + MaxBlockSpeed time.Duration `koanf:"max-block-speed" reload:"hot"` + MaxRevertGasReject uint64 `koanf:"max-revert-gas-reject" reload:"hot"` + MaxAcceptableTimestampDelta time.Duration `koanf:"max-acceptable-timestamp-delta" reload:"hot"` + SenderWhitelist string `koanf:"sender-whitelist"` + Forwarder ForwarderConfig `koanf:"forwarder"` + QueueSize int `koanf:"queue-size"` + QueueTimeout time.Duration `koanf:"queue-timeout" reload:"hot"` + NonceCacheSize int `koanf:"nonce-cache-size" reload:"hot"` + MaxTxDataSize int `koanf:"max-tx-data-size" reload:"hot"` + NonceFailureCacheSize int `koanf:"nonce-failure-cache-size" reload:"hot"` + NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"` + ExpectedSurplusSoftThreshold string `koanf:"expected-surplus-soft-threshold" reload:"hot"` + ExpectedSurplusHardThreshold string `koanf:"expected-surplus-hard-threshold" reload:"hot"` + expectedSurplusSoftThreshold int + expectedSurplusHardThreshold int } func (c *SequencerConfig) Validate() error { @@ -78,6 +90,20 @@ func (c *SequencerConfig) Validate() error { return fmt.Errorf("sequencer sender whitelist entry \"%v\" is not a valid address", address) } } + var err error + if c.ExpectedSurplusSoftThreshold != "default" { + if c.expectedSurplusSoftThreshold, err = strconv.Atoi(c.ExpectedSurplusSoftThreshold); err != nil { + return fmt.Errorf("invalid expected-surplus-soft-threshold value provided in batchposter config %w", err) + } + } + if c.ExpectedSurplusHardThreshold != "default" { + if c.expectedSurplusHardThreshold, err = strconv.Atoi(c.ExpectedSurplusHardThreshold); err != nil { + return fmt.Errorf("invalid expected-surplus-hard-threshold value provided in batchposter config %w", err) + } + } + if c.expectedSurplusSoftThreshold < c.expectedSurplusHardThreshold { + return errors.New("expected-surplus-soft-threshold cannot be lower than expected-surplus-hard-threshold") + } return nil } @@ -94,24 +120,28 @@ var DefaultSequencerConfig = SequencerConfig{ NonceCacheSize: 1024, // 95% of the default batch poster limit, leaving 5KB for headers and such // This default is overridden for L3 chains in applyChainParameters in cmd/nitro/nitro.go - MaxTxDataSize: 95000, - NonceFailureCacheSize: 1024, - NonceFailureCacheExpiry: time.Second, + MaxTxDataSize: 95000, + NonceFailureCacheSize: 1024, + NonceFailureCacheExpiry: time.Second, + ExpectedSurplusSoftThreshold: "default", + ExpectedSurplusHardThreshold: "default", } var TestSequencerConfig = SequencerConfig{ - Enable: true, - MaxBlockSpeed: time.Millisecond * 10, - MaxRevertGasReject: params.TxGas + 10000, - MaxAcceptableTimestampDelta: time.Hour, - SenderWhitelist: "", - Forwarder: DefaultTestForwarderConfig, - QueueSize: 128, - QueueTimeout: time.Second * 5, - NonceCacheSize: 4, - MaxTxDataSize: 95000, - NonceFailureCacheSize: 1024, - NonceFailureCacheExpiry: time.Second, + Enable: true, + MaxBlockSpeed: time.Millisecond * 10, + MaxRevertGasReject: params.TxGas + 10000, + MaxAcceptableTimestampDelta: time.Hour, + SenderWhitelist: "", + Forwarder: DefaultTestForwarderConfig, + QueueSize: 128, + QueueTimeout: time.Second * 5, + NonceCacheSize: 4, + MaxTxDataSize: 95000, + NonceFailureCacheSize: 1024, + NonceFailureCacheExpiry: time.Second, + ExpectedSurplusSoftThreshold: "default", + ExpectedSurplusHardThreshold: "default", } func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -127,6 +157,8 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-tx-data-size", DefaultSequencerConfig.MaxTxDataSize, "maximum transaction size the sequencer will accept") f.Int(prefix+".nonce-failure-cache-size", DefaultSequencerConfig.NonceFailureCacheSize, "number of transactions with too high of a nonce to keep in memory while waiting for their predecessor") f.Duration(prefix+".nonce-failure-cache-expiry", DefaultSequencerConfig.NonceFailureCacheExpiry, "maximum amount of time to wait for a predecessor before rejecting a tx with nonce too high") + f.String(prefix+".expected-surplus-soft-threshold", DefaultSequencerConfig.ExpectedSurplusSoftThreshold, "if expected surplus is lower than this value, warnings are posted") + f.String(prefix+".expected-surplus-hard-threshold", DefaultSequencerConfig.ExpectedSurplusHardThreshold, "if expected surplus is lower than this value, no new batches are posted") } type txQueueItem struct { @@ -291,6 +323,9 @@ type Sequencer struct { activeMutex sync.Mutex pauseChan chan struct{} forwarder *TxForwarder + + expectedSurplus int64 + expectedSurplusHardCheck bool } func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderReader, configFetcher SequencerConfigFetcher) (*Sequencer, error) { @@ -364,6 +399,10 @@ func ctxWithTimeout(ctx context.Context, timeout time.Duration) (context.Context } func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error { + if s.expectedSurplusHardCheck && s.expectedSurplus < int64(s.config().expectedSurplusHardThreshold) { + return errors.New("currently not accepting transactions due to expected surplus being below threshold") + } + sequencerBacklogGauge.Inc(1) defer sequencerBacklogGauge.Dec(1) @@ -944,14 +983,81 @@ func (s *Sequencer) Initialize(ctx context.Context) error { return nil } +var ( + usableBytesInBlob = big.NewInt(int64(len(kzg4844.Blob{}) * 31 / 32)) + blobTxBlobGasPerBlob = big.NewInt(params.BlobTxBlobGasPerBlob) +) + +func (s *Sequencer) updateExpectedSurplus(ctx context.Context) error { + header, err := s.l1Reader.LastHeader(ctx) + if err != nil { + return fmt.Errorf("error encountered getting latest header from l1reader while updating expectedSurplus: %w", err) + } + l1GasPrice := header.BaseFee.Uint64() + if header.BlobGasUsed != nil { + if header.ExcessBlobGas != nil { + blobFeePerByte := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*header.ExcessBlobGas, *header.BlobGasUsed)) + blobFeePerByte.Mul(blobFeePerByte, blobTxBlobGasPerBlob) + blobFeePerByte.Div(blobFeePerByte, usableBytesInBlob) + if l1GasPrice > blobFeePerByte.Uint64()/16 { + l1GasPrice = blobFeePerByte.Uint64() / 16 + } + } + } + surplus, err := s.execEngine.getL1PricingSurplus() + if err != nil { + return fmt.Errorf("error encountered getting l1 pricing surplus while updating expectedSurplus: %w", err) + } + backlogL1GasCharged := int64(s.execEngine.streamer.BacklogL1GasCharged()) + backlogCallDataUnits := int64(s.execEngine.streamer.BacklogCallDataUnits()) + s.expectedSurplus = int64(surplus) + backlogL1GasCharged - backlogCallDataUnits*int64(l1GasPrice) + // update metrics + l1GasPriceGauge.Update(int64(l1GasPrice)) + callDataUnitsBacklogGauge.Update(backlogCallDataUnits) + unusedL1GasChargeGauge.Update(backlogL1GasCharged) + currentSurplusGauge.Update(surplus) + expectedSurplusGauge.Update(s.expectedSurplus) + if s.config().ExpectedSurplusSoftThreshold != "default" && s.expectedSurplus < int64(s.config().expectedSurplusSoftThreshold) { + log.Warn("expected surplus is below soft threshold", "value", s.expectedSurplus, "threshold", s.config().expectedSurplusSoftThreshold) + } + return nil +} + func (s *Sequencer) Start(ctxIn context.Context) error { s.StopWaiter.Start(ctxIn, s) + + if (s.config().ExpectedSurplusHardThreshold != "default" || s.config().ExpectedSurplusSoftThreshold != "default") && s.l1Reader == nil { + return errors.New("expected surplus soft/hard thresholds are enabled but l1Reader is nil") + } + initialExpectedSurplusHardCheck := s.l1Reader != nil && s.config().ExpectedSurplusHardThreshold != "default" + s.expectedSurplusHardCheck = initialExpectedSurplusHardCheck + if s.l1Reader != nil { initialBlockNr := atomic.LoadUint64(&s.l1BlockNumber) if initialBlockNr == 0 { return errors.New("sequencer not initialized") } + if err := s.updateExpectedSurplus(ctxIn); err != nil { + if s.config().ExpectedSurplusHardThreshold != "default" { + return fmt.Errorf("expected-surplus-hard-threshold is enabled but error fetching initial expected surplus value: %w", err) + } + log.Error("error fetching initial expected surplus value", "err", err) + } + s.CallIteratively(func(ctx context.Context) time.Duration { + if err := s.updateExpectedSurplus(ctx); err != nil { + if initialExpectedSurplusHardCheck { + log.Error("expected-surplus-hard-threshold is enabled but unable to fetch latest expected surplus. Disabling expected-surplus-hard-threshold check and retrying immediately", "err", err) + s.expectedSurplusHardCheck = false + } else { + log.Error("expected-surplus-soft-threshold is enabled but unable to fetch latest expected surplus, retrying", "err", err) + } + return 0 + } + s.expectedSurplusHardCheck = initialExpectedSurplusHardCheck + return 5 * time.Second + }) + headerChan, cancel := s.l1Reader.Subscribe(false) s.LaunchThread(func(ctx context.Context) { diff --git a/execution/interface.go b/execution/interface.go index 2cbbf550ad..361fbc8ad5 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -55,6 +55,7 @@ type ExecutionSequencer interface { SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error NextDelayedMessageNumber() (uint64, error) SetTransactionStreamer(streamer TransactionStreamer) + GetL1GasPriceEstimate() (uint64, error) } type FullExecutionClient interface { @@ -82,4 +83,7 @@ type TransactionStreamer interface { BatchFetcher WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error ExpectChosenSequencer() error + CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) + BacklogL1GasCharged() uint64 + BacklogCallDataUnits() uint64 } diff --git a/precompiles/ArbGasInfo.go b/precompiles/ArbGasInfo.go index cda5350a4a..cb0045c49f 100644 --- a/precompiles/ArbGasInfo.go +++ b/precompiles/ArbGasInfo.go @@ -202,20 +202,7 @@ func (con ArbGasInfo) GetL1PricingSurplus(c ctx, evm mech) (*big.Int, error) { return con._preversion10_GetL1PricingSurplus(c, evm) } ps := c.State.L1PricingState() - fundsDueForRefunds, err := ps.BatchPosterTable().TotalFundsDue() - if err != nil { - return nil, err - } - fundsDueForRewards, err := ps.FundsDueForRewards() - if err != nil { - return nil, err - } - haveFunds, err := ps.L1FeesAvailable() - if err != nil { - return nil, err - } - needFunds := arbmath.BigAdd(fundsDueForRefunds, fundsDueForRewards) - return arbmath.BigSub(haveFunds, needFunds), nil + return ps.GetL1PricingSurplus() } func (con ArbGasInfo) _preversion10_GetL1PricingSurplus(c ctx, evm mech) (*big.Int, error) { From 306e4b40096c96fadcbeb8259b061dd2fa78cd7e Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 11 Mar 2024 21:35:24 -0500 Subject: [PATCH 03/11] refactor --- arbnode/batch_poster.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index f026ecc03f..cf87b205d3 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -519,8 +519,6 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { b.pricingMetrics.l1GasPriceEstimate = l1GasPriceEstimate l1GasPriceGauge.Update(int64(l1GasPrice)) l1GasPriceEstimateGauge.Update(int64(l1GasPriceEstimate)) - // We poll for new headers every five seconds to get accurate reporting of these metrics - time.Sleep(5 * time.Second) case <-ctx.Done(): return } From ec909b2e7dfb4dc02f72dc6cdf1f5d2100bb5fdc Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 11 Mar 2024 21:37:21 -0500 Subject: [PATCH 04/11] refactor --- arbnode/batch_poster.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 93dbb6ea8c..ca3fee93ce 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -498,8 +498,6 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { } else { suggestedTipCapGauge.Update(suggestedTipCap.Int64()) } - // We poll for new headers every five seconds to get accurate reporting of these metrics - time.Sleep(5 * time.Second) case <-ctx.Done(): return } From 6100dd785bf6b72f678ed9d88253f36845a2c525 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 12 Mar 2024 09:51:19 -0500 Subject: [PATCH 05/11] fix race --- arbnode/transaction_streamer.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 7145184dc6..c2ba5aaf6f 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -69,7 +69,8 @@ type TransactionStreamer struct { inboxReader *InboxReader delayedBridge *DelayedBridge - cachedL1PriceData *L1PriceData + cachedL1PriceDataMutex sync.RWMutex + cachedL1PriceData *L1PriceData } type TransactionStreamerConfig struct { @@ -141,6 +142,9 @@ type L1PriceData struct { } func (s *TransactionStreamer) CurrentEstimateOfL1GasPrice() uint64 { + s.cachedL1PriceDataMutex.Lock() + defer s.cachedL1PriceDataMutex.Unlock() + currentEstimate, err := s.exec.GetL1GasPriceEstimate() if err != nil { log.Error("error fetching current L2 estimate of L1 gas price hence reusing cached estimate", "err", err) @@ -151,6 +155,9 @@ func (s *TransactionStreamer) CurrentEstimateOfL1GasPrice() uint64 { } func (s *TransactionStreamer) BacklogCallDataUnits() uint64 { + s.cachedL1PriceDataMutex.RLock() + defer s.cachedL1PriceDataMutex.RUnlock() + size := len(s.cachedL1PriceData.msgToL1PriceData) if size == 0 { return 0 @@ -161,6 +168,9 @@ func (s *TransactionStreamer) BacklogCallDataUnits() uint64 { } func (s *TransactionStreamer) BacklogL1GasCharged() uint64 { + s.cachedL1PriceDataMutex.RLock() + defer s.cachedL1PriceDataMutex.RUnlock() + size := len(s.cachedL1PriceData.msgToL1PriceData) if size == 0 { return 0 @@ -171,6 +181,9 @@ func (s *TransactionStreamer) BacklogL1GasCharged() uint64 { } func (s *TransactionStreamer) TrimCache(to arbutil.MessageIndex) { + s.cachedL1PriceDataMutex.Lock() + defer s.cachedL1PriceDataMutex.Unlock() + if to < s.cachedL1PriceData.startOfL1PriceDataCache { log.Info("trying to trim older cache which doesnt exist anymore") } else if to >= s.cachedL1PriceData.endOfL1PriceDataCache { @@ -185,6 +198,9 @@ func (s *TransactionStreamer) TrimCache(to arbutil.MessageIndex) { } func (s *TransactionStreamer) CacheL1PriceDataOfMsg(seqNum arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) { + s.cachedL1PriceDataMutex.Lock() + defer s.cachedL1PriceDataMutex.Unlock() + resetCache := func() { s.cachedL1PriceData.startOfL1PriceDataCache = seqNum s.cachedL1PriceData.endOfL1PriceDataCache = seqNum From 61eedbd7c8b5c69ae8cdeaaa2b716a0f257cbd80 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 12 Mar 2024 12:25:17 -0500 Subject: [PATCH 06/11] fix race --- arbnode/batch_poster.go | 16 ++++----- execution/gethexec/sequencer.go | 57 +++++++++++++++++++-------------- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index cf87b205d3..303b244b3c 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -110,13 +110,6 @@ type BatchPoster struct { nextRevertCheckBlock int64 // the last parent block scanned for reverting batches accessList func(SequencerInboxAccs, AfterDelayedMessagesRead int) types.AccessList - - pricingMetrics l1PricingMetrics -} - -type l1PricingMetrics struct { - l1GasPrice uint64 - l1GasPriceEstimate uint64 } type l1BlockBound int @@ -515,8 +508,6 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { suggestedTipCapGauge.Update(suggestedTipCap.Int64()) } l1GasPriceEstimate := b.streamer.CurrentEstimateOfL1GasPrice() - b.pricingMetrics.l1GasPrice = l1GasPrice - b.pricingMetrics.l1GasPriceEstimate = l1GasPriceEstimate l1GasPriceGauge.Update(int64(l1GasPrice)) l1GasPriceEstimateGauge.Update(int64(l1GasPriceEstimate)) case <-ctx.Done(): @@ -1278,7 +1269,12 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) "numBlobs", len(kzgBlobs), ) - surplus := arbmath.SaturatingSub(int64(b.pricingMetrics.l1GasPrice), int64(b.pricingMetrics.l1GasPriceEstimate)) * int64(len(sequencerMsg)*16) + surplus := arbmath.SaturatingMul( + arbmath.SaturatingSub( + l1GasPriceGauge.Snapshot().Value(), + l1GasPriceEstimateGauge.Snapshot().Value()), + int64(len(sequencerMsg)*16), + ) latestBatchSurplusGauge.Update(surplus) recentlyHitL1Bounds := time.Since(b.lastHitL1Bounds) < config.PollInterval*3 diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index ced1aa9e31..a97c796fab 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -324,8 +324,9 @@ type Sequencer struct { pauseChan chan struct{} forwarder *TxForwarder - expectedSurplus int64 - expectedSurplusHardCheck bool + expectedSurplusMutex sync.RWMutex + expectedSurplus int64 + expectedSurplusUpdated bool } func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderReader, configFetcher SequencerConfigFetcher) (*Sequencer, error) { @@ -399,8 +400,14 @@ func ctxWithTimeout(ctx context.Context, timeout time.Duration) (context.Context } func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error { - if s.expectedSurplusHardCheck && s.expectedSurplus < int64(s.config().expectedSurplusHardThreshold) { - return errors.New("currently not accepting transactions due to expected surplus being below threshold") + // Only try to acquire Rlock and check for hard threshold if l1reader is not nil + // And hard threshold was enabled, this prevents spamming of read locks when not needed + if s.l1Reader != nil && s.config().ExpectedSurplusHardThreshold != "default" { + s.expectedSurplusMutex.RLock() + if s.expectedSurplusUpdated && s.expectedSurplus < int64(s.config().expectedSurplusHardThreshold) { + return errors.New("currently not accepting transactions due to expected surplus being below threshold") + } + s.expectedSurplusMutex.RUnlock() } sequencerBacklogGauge.Inc(1) @@ -988,10 +995,10 @@ var ( blobTxBlobGasPerBlob = big.NewInt(params.BlobTxBlobGasPerBlob) ) -func (s *Sequencer) updateExpectedSurplus(ctx context.Context) error { +func (s *Sequencer) updateExpectedSurplus(ctx context.Context) (int64, error) { header, err := s.l1Reader.LastHeader(ctx) if err != nil { - return fmt.Errorf("error encountered getting latest header from l1reader while updating expectedSurplus: %w", err) + return 0, fmt.Errorf("error encountered getting latest header from l1reader while updating expectedSurplus: %w", err) } l1GasPrice := header.BaseFee.Uint64() if header.BlobGasUsed != nil { @@ -1006,21 +1013,21 @@ func (s *Sequencer) updateExpectedSurplus(ctx context.Context) error { } surplus, err := s.execEngine.getL1PricingSurplus() if err != nil { - return fmt.Errorf("error encountered getting l1 pricing surplus while updating expectedSurplus: %w", err) + return 0, fmt.Errorf("error encountered getting l1 pricing surplus while updating expectedSurplus: %w", err) } backlogL1GasCharged := int64(s.execEngine.streamer.BacklogL1GasCharged()) backlogCallDataUnits := int64(s.execEngine.streamer.BacklogCallDataUnits()) - s.expectedSurplus = int64(surplus) + backlogL1GasCharged - backlogCallDataUnits*int64(l1GasPrice) + expectedSurplus := int64(surplus) + backlogL1GasCharged - backlogCallDataUnits*int64(l1GasPrice) // update metrics l1GasPriceGauge.Update(int64(l1GasPrice)) callDataUnitsBacklogGauge.Update(backlogCallDataUnits) unusedL1GasChargeGauge.Update(backlogL1GasCharged) currentSurplusGauge.Update(surplus) - expectedSurplusGauge.Update(s.expectedSurplus) - if s.config().ExpectedSurplusSoftThreshold != "default" && s.expectedSurplus < int64(s.config().expectedSurplusSoftThreshold) { - log.Warn("expected surplus is below soft threshold", "value", s.expectedSurplus, "threshold", s.config().expectedSurplusSoftThreshold) + expectedSurplusGauge.Update(expectedSurplus) + if s.config().ExpectedSurplusSoftThreshold != "default" && expectedSurplus < int64(s.config().expectedSurplusSoftThreshold) { + log.Warn("expected surplus is below soft threshold", "value", expectedSurplus, "threshold", s.config().expectedSurplusSoftThreshold) } - return nil + return expectedSurplus, nil } func (s *Sequencer) Start(ctxIn context.Context) error { @@ -1029,8 +1036,6 @@ func (s *Sequencer) Start(ctxIn context.Context) error { if (s.config().ExpectedSurplusHardThreshold != "default" || s.config().ExpectedSurplusSoftThreshold != "default") && s.l1Reader == nil { return errors.New("expected surplus soft/hard thresholds are enabled but l1Reader is nil") } - initialExpectedSurplusHardCheck := s.l1Reader != nil && s.config().ExpectedSurplusHardThreshold != "default" - s.expectedSurplusHardCheck = initialExpectedSurplusHardCheck if s.l1Reader != nil { initialBlockNr := atomic.LoadUint64(&s.l1BlockNumber) @@ -1038,23 +1043,27 @@ func (s *Sequencer) Start(ctxIn context.Context) error { return errors.New("sequencer not initialized") } - if err := s.updateExpectedSurplus(ctxIn); err != nil { + expectedSurplus, err := s.updateExpectedSurplus(ctxIn) + if err != nil { if s.config().ExpectedSurplusHardThreshold != "default" { return fmt.Errorf("expected-surplus-hard-threshold is enabled but error fetching initial expected surplus value: %w", err) } - log.Error("error fetching initial expected surplus value", "err", err) + log.Error("expected-surplus-soft-threshold is enabled but error fetching initial expected surplus value", "err", err) + } else { + s.expectedSurplus = expectedSurplus + s.expectedSurplusUpdated = true } s.CallIteratively(func(ctx context.Context) time.Duration { - if err := s.updateExpectedSurplus(ctx); err != nil { - if initialExpectedSurplusHardCheck { - log.Error("expected-surplus-hard-threshold is enabled but unable to fetch latest expected surplus. Disabling expected-surplus-hard-threshold check and retrying immediately", "err", err) - s.expectedSurplusHardCheck = false - } else { - log.Error("expected-surplus-soft-threshold is enabled but unable to fetch latest expected surplus, retrying", "err", err) - } + expectedSurplus, err := s.updateExpectedSurplus(ctxIn) + s.expectedSurplusMutex.Lock() + defer s.expectedSurplusMutex.Unlock() + if err != nil { + s.expectedSurplusUpdated = false + log.Error("expected surplus soft/hard thresholds are enabled but unable to fetch latest expected surplus, retrying", "err", err) return 0 } - s.expectedSurplusHardCheck = initialExpectedSurplusHardCheck + s.expectedSurplusUpdated = true + s.expectedSurplus = expectedSurplus return 5 * time.Second }) From aa71e2e7d3931bd6b328b96a9bc96c30cd0a5ddf Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 18 Mar 2024 13:36:17 -0500 Subject: [PATCH 07/11] address PR comments --- arbnode/batch_poster.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 52c189761c..41e6e00f3a 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -52,13 +52,15 @@ import ( ) var ( - batchPosterWalletBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/wallet/balanceether", nil) - batchPosterGasRefunderBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/gasrefunder/balanceether", nil) - baseFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/basefee", nil) - blobFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/blobfee", nil) - blockGasUsedPerBlockGasLimitGauge = metrics.NewRegisteredGaugeFloat64("arb/batchposter/blockgasusedperblockgaslimit", nil) - blobGasUsedPerBlobGasLimitGauge = metrics.NewRegisteredGaugeFloat64("arb/batchposter/blobgasusedperblobgaslimit", nil) - suggestedTipCapGauge = metrics.NewRegisteredGauge("arb/batchposter/suggestedtipcap", nil) + batchPosterWalletBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/wallet/eth", nil) + batchPosterGasRefunderBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/gasrefunder/eth", nil) + baseFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/basefee", nil) + blobFeeGauge = metrics.NewRegisteredHistogram("arb/batchposter/blobfee", nil, metrics.NewBoundedHistogramSample()) + blockGasUsedGauge = metrics.NewRegisteredGauge("arb/batchposter/blockgas/used", nil) + blockGasLimitGauge = metrics.NewRegisteredGauge("arb/batchposter/blockgas/limit", nil) + blobGasUsedGauge = metrics.NewRegisteredGauge("arb/batchposter/blobgas/used", nil) + blobGasLimitGauge = metrics.NewRegisteredGauge("arb/batchposter/blobgas/limit", nil) + suggestedTipCapGauge = metrics.NewRegisteredGauge("arb/batchposter/suggestedtipcap", nil) usableBytesInBlob = big.NewInt(int64(len(kzg4844.Blob{}) * 31 / 32)) blobTxBlobGasPerBlob = big.NewInt(params.BlobTxBlobGasPerBlob) @@ -480,6 +482,7 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { headerCh, unsubscribe := b.l1Reader.Subscribe(false) defer unsubscribe() + blobGasLimitGauge.Update(params.MaxBlobGasPerBlock) for { select { case h, ok := <-headerCh: @@ -493,9 +496,10 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { blobFee := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*h.ExcessBlobGas, *h.BlobGasUsed)) blobFeeGauge.Update(blobFee.Int64()) } - blobGasUsedPerBlobGasLimitGauge.Update(float64(*h.BlobGasUsed) / params.MaxBlobGasPerBlock) + blobGasUsedGauge.Update(int64(*h.BlobGasUsed)) } - blockGasUsedPerBlockGasLimitGauge.Update(float64(h.GasUsed) / float64(h.GasLimit)) + blockGasUsedGauge.Update(int64(h.GasUsed)) + blockGasLimitGauge.Update(int64(h.GasLimit)) suggestedTipCap, err := b.l1Reader.Client().SuggestGasTipCap(ctx) if err != nil { log.Error("unable to fetch suggestedTipCap from l1 client to update arb/batchposter/suggestedtipcap metric", "err", err) From d66c19dbd8b8d55859957284a642a52841c0f2c4 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 18 Mar 2024 13:41:56 -0500 Subject: [PATCH 08/11] rectify blobFee calculation --- arbnode/batch_poster.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 41e6e00f3a..9a4d962d67 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -493,8 +493,10 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { baseFeeGauge.Update(h.BaseFee.Int64()) if h.BlobGasUsed != nil { if h.ExcessBlobGas != nil { - blobFee := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*h.ExcessBlobGas, *h.BlobGasUsed)) - blobFeeGauge.Update(blobFee.Int64()) + blobFeePerByte := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*h.ExcessBlobGas, *h.BlobGasUsed)) + blobFeePerByte.Mul(blobFeePerByte, blobTxBlobGasPerBlob) + blobFeePerByte.Div(blobFeePerByte, usableBytesInBlob) + blobFeeGauge.Update(blobFeePerByte.Int64()) } blobGasUsedGauge.Update(int64(*h.BlobGasUsed)) } From 71499effacafe3ddcc14524a1ae19f0a86e6c448 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Mon, 18 Mar 2024 14:02:49 -0500 Subject: [PATCH 09/11] fix typos --- execution/gethexec/executionengine.go | 6 +++--- execution/gethexec/sequencer.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 2cbb486fe8..02a8a1145a 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -530,15 +530,15 @@ func (s *ExecutionEngine) getL1PricingSurplus() (int64, error) { latestHeader := bc.CurrentBlock() latestState, err := bc.StateAt(latestHeader.Root) if err != nil { - return 0, errors.New("error getting latest statedb while fetching l2 Estimate of L1 GasPrice") + return 0, errors.New("error getting latest statedb while fetching current L1 pricing surplus") } arbState, err := arbosState.OpenSystemArbosState(latestState, nil, true) if err != nil { - return 0, errors.New("error opening system arbos state while fetching l2 Estimate of L1 GasPrice") + return 0, errors.New("error opening system arbos state while fetching current L1 pricing surplus") } surplus, err := arbState.L1PricingState().GetL1PricingSurplus() if err != nil { - return 0, errors.New("error fetching l2 Estimate of L1 GasPrice") + return 0, errors.New("error fetching current L1 pricing surplus") } return surplus.Int64(), nil } diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index a97c796fab..cf5ec7f68c 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -158,7 +158,7 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".nonce-failure-cache-size", DefaultSequencerConfig.NonceFailureCacheSize, "number of transactions with too high of a nonce to keep in memory while waiting for their predecessor") f.Duration(prefix+".nonce-failure-cache-expiry", DefaultSequencerConfig.NonceFailureCacheExpiry, "maximum amount of time to wait for a predecessor before rejecting a tx with nonce too high") f.String(prefix+".expected-surplus-soft-threshold", DefaultSequencerConfig.ExpectedSurplusSoftThreshold, "if expected surplus is lower than this value, warnings are posted") - f.String(prefix+".expected-surplus-hard-threshold", DefaultSequencerConfig.ExpectedSurplusHardThreshold, "if expected surplus is lower than this value, no new batches are posted") + f.String(prefix+".expected-surplus-hard-threshold", DefaultSequencerConfig.ExpectedSurplusHardThreshold, "if expected surplus is lower than this value, new incoming transactions will be denied") } type txQueueItem struct { From 158ce7bff8f0517ab6c0111be31f6c8a98277009 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 26 Mar 2024 11:38:23 -0500 Subject: [PATCH 10/11] fix metric name --- arbnode/batch_poster.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 9735d0b35a..035b69fcb8 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -56,7 +56,7 @@ var ( batchPosterWalletBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/wallet/eth", nil) batchPosterGasRefunderBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/gasrefunder/eth", nil) baseFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/basefee", nil) - blobFeeGauge = metrics.NewRegisteredHistogram("arb/batchposter/blobfee", nil, metrics.NewBoundedHistogramSample()) + blobFeeHistogram = metrics.NewRegisteredHistogram("arb/batchposter/blobfee", nil, metrics.NewBoundedHistogramSample()) l1GasPriceGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice", nil) l1GasPriceEstimateGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice/estimate", nil) latestBatchSurplusGauge = metrics.NewRegisteredGauge("arb/batchposter/latestbatchsurplus", nil) @@ -539,7 +539,7 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { blobFeePerByte := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*h.ExcessBlobGas, *h.BlobGasUsed)) blobFeePerByte.Mul(blobFeePerByte, blobTxBlobGasPerBlob) blobFeePerByte.Div(blobFeePerByte, usableBytesInBlob) - blobFeeGauge.Update(blobFeePerByte.Int64()) + blobFeeHistogram.Update(blobFeePerByte.Int64()) if l1GasPrice > blobFeePerByte.Uint64()/16 { l1GasPrice = blobFeePerByte.Uint64() / 16 } From 94727331a18abf3d589eb386fa6e41931b8269c5 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 28 Mar 2024 14:00:27 -0400 Subject: [PATCH 11/11] address PR comments --- arbnode/batch_poster.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 035b69fcb8..32b6175108 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -56,7 +56,7 @@ var ( batchPosterWalletBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/wallet/eth", nil) batchPosterGasRefunderBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/gasrefunder/eth", nil) baseFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/basefee", nil) - blobFeeHistogram = metrics.NewRegisteredHistogram("arb/batchposter/blobfee", nil, metrics.NewBoundedHistogramSample()) + blobFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/blobfee", nil) l1GasPriceGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice", nil) l1GasPriceEstimateGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice/estimate", nil) latestBatchSurplusGauge = metrics.NewRegisteredGauge("arb/batchposter/latestbatchsurplus", nil) @@ -539,7 +539,7 @@ func (b *BatchPoster) pollForL1PriceData(ctx context.Context) { blobFeePerByte := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*h.ExcessBlobGas, *h.BlobGasUsed)) blobFeePerByte.Mul(blobFeePerByte, blobTxBlobGasPerBlob) blobFeePerByte.Div(blobFeePerByte, usableBytesInBlob) - blobFeeHistogram.Update(blobFeePerByte.Int64()) + blobFeeGauge.Update(blobFeePerByte.Int64()) if l1GasPrice > blobFeePerByte.Uint64()/16 { l1GasPrice = blobFeePerByte.Uint64() / 16 }