From e5b62f3cda7b9fd8cf339ad94fd9bf3b772f91e6 Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Tue, 21 Nov 2023 22:06:14 +0530 Subject: [PATCH 01/12] Prioritize reading messages from primary feeds over secondaries --- broadcastclients/broadcastclients.go | 56 +++++++++++++++------------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index b2824221ea..7fe6ca489f 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -170,45 +170,49 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { bcs.primaryRouter.forwardConfirmationChan <- cs } - // Secondary Feeds - case msg := <-bcs.secondaryRouter.messageChan: - startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) - if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok { - continue - } - if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok { - continue - } - recentFeedItemsNew[msg.SequenceNumber] = time.Now() - if err := bcs.secondaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { - log.Error("Error routing message from Secondary Sequencer Feeds", "err", err) - } - case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan: - startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) - if cs == lastConfirmed { - continue - } - lastConfirmed = cs - if bcs.secondaryRouter.forwardConfirmationChan != nil { - bcs.secondaryRouter.forwardConfirmationChan <- cs - } - // Cycle buckets to get rid of old entries case <-recentFeedItemsCleanup.C: recentFeedItemsOld = recentFeedItemsNew recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE) - // failed to get messages from both primary and secondary feeds for ~5 seconds, start a new secondary feed + // Failed to get messages from both primary and secondary feeds for ~5 seconds, start a new secondary feed case <-startSecondaryFeedTimer.C: bcs.startSecondaryFeed(ctx) - // failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary + // Failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary case <-primaryFeedIsDownTimer.C: stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME) - // primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed + // Primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed case <-stopSecondaryFeedTimer.C: bcs.stopSecondaryFeed() + + default: + select { + // Secondary Feeds + case msg := <-bcs.secondaryRouter.messageChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok { + continue + } + if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok { + continue + } + recentFeedItemsNew[msg.SequenceNumber] = time.Now() + if err := bcs.secondaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { + log.Error("Error routing message from Secondary Sequencer Feeds", "err", err) + } + case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + if cs == lastConfirmed { + continue + } + lastConfirmed = cs + if bcs.secondaryRouter.forwardConfirmationChan != nil { + bcs.secondaryRouter.forwardConfirmationChan <- cs + } + default: + } } } }) From d1cb68159afe2cf3617335f992ae4beae6143b6f Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Wed, 22 Nov 2023 21:37:27 +0530 Subject: [PATCH 02/12] address PR comments --- broadcastclients/broadcastclients.go | 98 ++++++++++++++++------------ 1 file changed, 58 insertions(+), 40 deletions(-) diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 7fe6ca489f..b243c51039 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -140,78 +140,86 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { defer startSecondaryFeedTimer.Stop() defer stopSecondaryFeedTimer.Stop() defer primaryFeedIsDownTimer.Stop() + + msgHandler := func(msg broadcaster.BroadcastFeedMessage, router *Router) error { + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok { + return nil + } + if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok { + return nil + } + recentFeedItemsNew[msg.SequenceNumber] = time.Now() + if err := router.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { + return err + } + return nil + } + confSeqHandler := func(cs arbutil.MessageIndex, router *Router) { + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + if cs == lastConfirmed { + return + } + lastConfirmed = cs + if router.forwardConfirmationChan != nil { + router.forwardConfirmationChan <- cs + } + } + + // Two select statements to prioritize reading messages from primary feeds' channels for { select { case <-ctx.Done(): return - // Primary feeds case msg := <-bcs.primaryRouter.messageChan: - startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) - if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok { - continue - } - if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok { - continue - } - recentFeedItemsNew[msg.SequenceNumber] = time.Now() - if err := bcs.primaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { + if err := msgHandler(msg, bcs.primaryRouter); err != nil { log.Error("Error routing message from Primary Sequencer Feeds", "err", err) } case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan: - startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) - if cs == lastConfirmed { - continue - } - lastConfirmed = cs - if bcs.primaryRouter.forwardConfirmationChan != nil { - bcs.primaryRouter.forwardConfirmationChan <- cs - } - + confSeqHandler(cs, bcs.primaryRouter) // Cycle buckets to get rid of old entries case <-recentFeedItemsCleanup.C: recentFeedItemsOld = recentFeedItemsNew recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE) - // Failed to get messages from both primary and secondary feeds for ~5 seconds, start a new secondary feed case <-startSecondaryFeedTimer.C: bcs.startSecondaryFeed(ctx) - // Failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary case <-primaryFeedIsDownTimer.C: stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME) - // Primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed case <-stopSecondaryFeedTimer.C: bcs.stopSecondaryFeed() - default: select { + case <-ctx.Done(): + return // Secondary Feeds case msg := <-bcs.secondaryRouter.messageChan: - startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) - if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok { - continue - } - if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok { - continue - } - recentFeedItemsNew[msg.SequenceNumber] = time.Now() - if err := bcs.secondaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { + if err := msgHandler(msg, bcs.secondaryRouter); err != nil { log.Error("Error routing message from Secondary Sequencer Feeds", "err", err) } case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan: - startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) - if cs == lastConfirmed { - continue - } - lastConfirmed = cs - if bcs.secondaryRouter.forwardConfirmationChan != nil { - bcs.secondaryRouter.forwardConfirmationChan <- cs + confSeqHandler(cs, bcs.secondaryRouter) + + case msg := <-bcs.primaryRouter.messageChan: + primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) + if err := msgHandler(msg, bcs.primaryRouter); err != nil { + log.Error("Error routing message from Primary Sequencer Feeds", "err", err) } - default: + case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan: + primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) + confSeqHandler(cs, bcs.primaryRouter) + case <-recentFeedItemsCleanup.C: + recentFeedItemsOld = recentFeedItemsNew + recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE) + case <-startSecondaryFeedTimer.C: + bcs.startSecondaryFeed(ctx) + case <-primaryFeedIsDownTimer.C: + stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME) } } } @@ -244,6 +252,16 @@ func (bcs *BroadcastClients) stopSecondaryFeed() { bcs.secondaryClients = bcs.secondaryClients[:pos] log.Info("disconnected secondary feed", "url", bcs.secondaryURL[pos]) } + // flush the secondary feeds' message and confirmedSequenceNumber channels +f: + for { + select { + case <-bcs.secondaryRouter.messageChan: + case <-bcs.secondaryRouter.confirmedSequenceNumberChan: + default: + break f + } + } } func (bcs *BroadcastClients) StopAndWait() { From d1a4e3db0a8784acc42f8982e8b952c437d9a9f0 Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Mon, 27 Nov 2023 22:11:12 +0530 Subject: [PATCH 03/12] code refactor --- broadcastclients/broadcastclients.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index b243c51039..3e048000e0 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -251,15 +251,16 @@ func (bcs *BroadcastClients) stopSecondaryFeed() { bcs.secondaryClients[pos].StopAndWait() bcs.secondaryClients = bcs.secondaryClients[:pos] log.Info("disconnected secondary feed", "url", bcs.secondaryURL[pos]) - } - // flush the secondary feeds' message and confirmedSequenceNumber channels -f: - for { - select { - case <-bcs.secondaryRouter.messageChan: - case <-bcs.secondaryRouter.confirmedSequenceNumberChan: - default: - break f + + // flush the secondary feeds' message and confirmedSequenceNumber channels + f: + for { + select { + case <-bcs.secondaryRouter.messageChan: + case <-bcs.secondaryRouter.confirmedSequenceNumberChan: + default: + break f + } } } } From 8d0524de66a64666d6d52c07e493b852bf94459d Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Tue, 28 Nov 2023 21:39:00 +0530 Subject: [PATCH 04/12] Add backup transaction submission RPC URLs to arb1 chain information --- cmd/chaininfo/arbitrum_chain_info.json | 1 + cmd/chaininfo/chain_info.go | 15 ++++++++------- cmd/nitro/nitro.go | 3 +++ 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/cmd/chaininfo/arbitrum_chain_info.json b/cmd/chaininfo/arbitrum_chain_info.json index 421abea20a..31d25cfdf5 100644 --- a/cmd/chaininfo/arbitrum_chain_info.json +++ b/cmd/chaininfo/arbitrum_chain_info.json @@ -4,6 +4,7 @@ "parent-chain-id": 1, "parent-chain-is-arbitrum": false, "sequencer-url": "https://arb1-sequencer.arbitrum.io/rpc", + "secondary-forwarding-target": "https://arb1-sequencer-fallback-1.arbitrum.io/rpc,https://arb1-sequencer-fallback-2.arbitrum.io/rpc,https://arb1-sequencer-fallback-3.arbitrum.io/rpc,https://arb1-sequencer-fallback-4.arbitrum.io/rpc,https://arb1-sequencer-fallback-5.arbitrum.io/rpc", "feed-url": "wss://arb1-feed.arbitrum.io/feed", "secondary-feed-url": "wss://arb1-delayed-feed.arbitrum.io/feed,wss://arb1-feed-fallback-1.arbitrum.io/feed,wss://arb1-feed-fallback-2.arbitrum.io/feed,wss://arb1-feed-fallback-3.arbitrum.io/feed,wss://arb1-feed-fallback-4.arbitrum.io/feed,wss://arb1-feed-fallback-5.arbitrum.io/feed", "has-genesis-state": true, diff --git a/cmd/chaininfo/chain_info.go b/cmd/chaininfo/chain_info.go index fe499442d2..c0d6b3d005 100644 --- a/cmd/chaininfo/chain_info.go +++ b/cmd/chaininfo/chain_info.go @@ -22,13 +22,14 @@ type ChainInfo struct { ParentChainId uint64 `json:"parent-chain-id"` ParentChainIsArbitrum *bool `json:"parent-chain-is-arbitrum"` // This is the forwarding target to submit transactions to, called the sequencer URL for clarity - SequencerUrl string `json:"sequencer-url"` - FeedUrl string `json:"feed-url"` - SecondaryFeedUrl string `json:"secondary-feed-url"` - DasIndexUrl string `json:"das-index-url"` - HasGenesisState bool `json:"has-genesis-state"` - ChainConfig *params.ChainConfig `json:"chain-config"` - RollupAddresses *RollupAddresses `json:"rollup"` + SequencerUrl string `json:"sequencer-url"` + SecondaryForwardingTarget string `json:"secondary-forwarding-target"` + FeedUrl string `json:"feed-url"` + SecondaryFeedUrl string `json:"secondary-feed-url"` + DasIndexUrl string `json:"das-index-url"` + HasGenesisState bool `json:"has-genesis-state"` + ChainConfig *params.ChainConfig `json:"chain-config"` + RollupAddresses *RollupAddresses `json:"rollup"` } func GetChainConfig(chainId *big.Int, chainName string, genesisBlockNum uint64, l2ChainInfoFiles []string, l2ChainInfoJson string) (*params.ChainConfig, error) { diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 7e089d946c..9b34d2371e 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -814,6 +814,9 @@ func applyChainParameters(ctx context.Context, k *koanf.Koanf, chainId uint64, c if chainInfo.SequencerUrl != "" { chainDefaults["execution.forwarding-target"] = chainInfo.SequencerUrl } + if chainInfo.SecondaryForwardingTarget != "" { + chainDefaults["execution.secondary-forwarding-target"] = strings.Split(chainInfo.SecondaryForwardingTarget, ",") + } if chainInfo.FeedUrl != "" { chainDefaults["node.feed.input.url"] = strings.Split(chainInfo.FeedUrl, ",") } From 2dfe8b45109f913bc8d445d3ab1a34b6a3eca671 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 28 Nov 2023 11:37:04 -0700 Subject: [PATCH 05/12] Use normal gas estimation when possible in the batch poster --- arbnode/batch_poster.go | 60 ++++++++++++++++--------------- arbnode/dataposter/data_poster.go | 20 ++++++++--- 2 files changed, 47 insertions(+), 33 deletions(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index edfb5c35d2..498216cfd4 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbnode/dataposter" "github.com/offchainlabs/nitro/arbnode/redislock" @@ -757,32 +756,26 @@ func (b *BatchPoster) encodeAddBatch(seqNum *big.Int, prevMsgNum arbutil.Message return fullData, nil } -func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, delayedMessages uint64) (uint64, error) { +func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, delayedMessages uint64, realData []byte, realNonce uint64) (uint64, error) { config := b.config() - callOpts := &bind.CallOpts{ - Context: ctx, - } - if config.DataPoster.WaitForL1Finality { - callOpts.BlockNumber = big.NewInt(int64(rpc.SafeBlockNumber)) - } - safeDelayedMessagesBig, err := b.bridge.DelayedMessageCount(callOpts) - if err != nil { - return 0, fmt.Errorf("failed to get the confirmed delayed message count: %w", err) - } - if !safeDelayedMessagesBig.IsUint64() { - return 0, fmt.Errorf("calling delayedMessageCount() on the bridge returned a non-uint64 result %v", safeDelayedMessagesBig) - } - safeDelayedMessages := safeDelayedMessagesBig.Uint64() - if safeDelayedMessages > delayedMessages { - // On restart, we may be trying to estimate gas for a batch whose successor has - // already made it into pending state, if not latest state. - // In that case, we might get a revert with `DelayedBackwards()`. - // To avoid that, we artificially increase the delayed messages to `safeDelayedMessages`. - // In theory, this might reduce gas usage, but only by a factor that's already - // accounted for in `config.ExtraBatchGas`, as that same factor can appear if a user - // posts a new delayed message that we didn't see while gas estimating. - delayedMessages = safeDelayedMessages + useNormalEstimation := b.dataPoster.MaxMempoolTransactions() == 1 + if !useNormalEstimation { + // Check if we can use normal estimation anyways because we're at the latest nonce + latestNonce, err := b.l1Reader.Client().NonceAt(ctx, b.dataPoster.Sender(), nil) + if err != nil { + return 0, err + } + useNormalEstimation = latestNonce == realNonce + } + if useNormalEstimation { + // If we're at the latest nonce, we can skip the special future tx estimate stuff + return b.l1Reader.Client().EstimateGas(ctx, ethereum.CallMsg{ + From: b.dataPoster.Sender(), + To: &b.seqInboxAddr, + Data: realData, + }) } + // Here we set seqNum to MaxUint256, and prevMsgNum to 0, because it disables the smart contracts' consistency checks. // However, we set nextMsgNum to 1 because it is necessary for a correct estimation for the final to be non-zero. // Because we're likely estimating against older state, this might not be the actual next message, @@ -805,7 +798,6 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, "error estimating gas for batch", "err", err, "delayedMessages", delayedMessages, - "safeDelayedMessages", safeDelayedMessages, "sequencerMessageHeader", hex.EncodeToString(sequencerMessageHeader), "sequencerMessageLen", len(sequencerMessage), ) @@ -858,6 +850,11 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) } firstMsgTime := time.Unix(int64(firstMsg.Message.Header.Timestamp), 0) + lastPotentialMsg, err := b.streamer.GetMessage(msgCount - 1) + if err != nil { + return false, err + } + config := b.config() forcePostBatch := time.Since(firstMsgTime) >= config.MaxDelay @@ -1000,11 +997,18 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) } } - gasLimit, err := b.estimateGas(ctx, sequencerMsg, b.building.segments.delayedMsg) + data, err := b.encodeAddBatch(new(big.Int).SetUint64(batchPosition.NextSeqNum), batchPosition.MessageCount, b.building.msgCount, sequencerMsg, b.building.segments.delayedMsg) if err != nil { return false, err } - data, err := b.encodeAddBatch(new(big.Int).SetUint64(batchPosition.NextSeqNum), batchPosition.MessageCount, b.building.msgCount, sequencerMsg, b.building.segments.delayedMsg) + // On restart, we may be trying to estimate gas for a batch whose successor has + // already made it into pending state, if not latest state. + // In that case, we might get a revert with `DelayedBackwards()`. + // To avoid that, we artificially increase the delayed messages to `lastPotentialMsg.DelayedMessagesRead`. + // In theory, this might reduce gas usage, but only by a factor that's already + // accounted for in `config.ExtraBatchGas`, as that same factor can appear if a user + // posts a new delayed message that we didn't see while gas estimating. + gasLimit, err := b.estimateGas(ctx, sequencerMsg, lastPotentialMsg.DelayedMessagesRead, data, nonce) if err != nil { return false, err } diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index b5be06af56..266131a6b9 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -56,6 +56,7 @@ type DataPoster struct { signer signerFn redisLock AttemptLocker config ConfigFetcher + usingNoOpStorage bool replacementTimes []time.Duration metadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error) @@ -119,8 +120,9 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro if err != nil { return nil, err } + useNoOpStorage := cfg.UseNoOpStorage if opts.HeaderReader.IsParentChainArbitrum() && !cfg.UseNoOpStorage { - cfg.UseNoOpStorage = true + useNoOpStorage = true log.Info("Disabling data poster storage, as parent chain appears to be an Arbitrum chain without a mempool") } encF := func() storage.EncoderDecoderInterface { @@ -131,7 +133,7 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro } var queue QueueStorage switch { - case cfg.UseNoOpStorage: + case useNoOpStorage: queue = &noop.Storage{} case opts.RedisClient != nil: var err error @@ -158,6 +160,7 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro return opts.Auth.Signer(addr, tx) }, config: opts.Config, + usingNoOpStorage: useNoOpStorage, replacementTimes: replacementTimes, metadataRetriever: opts.MetadataRetriever, queue: queue, @@ -252,6 +255,13 @@ func (p *DataPoster) Sender() common.Address { return p.sender } +func (p *DataPoster) MaxMempoolTransactions() uint64 { + if p.usingNoOpStorage { + return 1 + } + return p.config().MaxMempoolTransactions +} + // Does basic check whether posting transaction with specified nonce would // result in exceeding maximum queue length or maximum transactions in mempool. func (p *DataPoster) canPostWithNonce(ctx context.Context, nextNonce uint64) error { @@ -398,7 +408,7 @@ func (p *DataPoster) feeAndTipCaps(ctx context.Context, nonce uint64, gasLimit u latestBalance := p.balance balanceForTx := new(big.Int).Set(latestBalance) - if config.AllocateMempoolBalance && !config.UseNoOpStorage { + if config.AllocateMempoolBalance && !p.usingNoOpStorage { // We reserve half the balance for the first transaction, and then split the remaining balance for all after that. // With noop storage, we don't try to replace-by-fee, so we don't need to worry about this. balanceForTx.Div(balanceForTx, common.Big2) @@ -500,12 +510,12 @@ func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransacti } if err := p.client.SendTransaction(ctx, newTx.FullTx); err != nil { if !strings.Contains(err.Error(), "already known") && !strings.Contains(err.Error(), "nonce too low") { - log.Warn("DataPoster failed to send transaction", "err", err, "nonce", newTx.FullTx.Nonce(), "feeCap", newTx.FullTx.GasFeeCap(), "tipCap", newTx.FullTx.GasTipCap()) + log.Warn("DataPoster failed to send transaction", "err", err, "nonce", newTx.FullTx.Nonce(), "feeCap", newTx.FullTx.GasFeeCap(), "tipCap", newTx.FullTx.GasTipCap(), "gas", newTx.FullTx.Gas()) return err } log.Info("DataPoster transaction already known", "err", err, "nonce", newTx.FullTx.Nonce(), "hash", newTx.FullTx.Hash()) } else { - log.Info("DataPoster sent transaction", "nonce", newTx.FullTx.Nonce(), "hash", newTx.FullTx.Hash(), "feeCap", newTx.FullTx.GasFeeCap()) + log.Info("DataPoster sent transaction", "nonce", newTx.FullTx.Nonce(), "hash", newTx.FullTx.Hash(), "feeCap", newTx.FullTx.GasFeeCap(), "tipCap", newTx.FullTx.GasTipCap(), "gas", newTx.FullTx.Gas()) } newerTx := *newTx newerTx.Sent = true From 8c23ffc327ef8465dc406e1ac32e3e3216597ba5 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 28 Nov 2023 14:07:48 -0700 Subject: [PATCH 06/12] Don't forget ExtraBatchGas for normal estimation --- arbnode/batch_poster.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 498216cfd4..d96ed57238 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -769,11 +769,15 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, } if useNormalEstimation { // If we're at the latest nonce, we can skip the special future tx estimate stuff - return b.l1Reader.Client().EstimateGas(ctx, ethereum.CallMsg{ + gas, err := b.l1Reader.Client().EstimateGas(ctx, ethereum.CallMsg{ From: b.dataPoster.Sender(), To: &b.seqInboxAddr, Data: realData, }) + if err != nil { + return 0, err + } + return gas + config.ExtraBatchGas, nil } // Here we set seqNum to MaxUint256, and prevMsgNum to 0, because it disables the smart contracts' consistency checks. From 4f97815165b40933f3a2c76ed6d6db29ab88d0fb Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Wed, 29 Nov 2023 11:07:04 +0530 Subject: [PATCH 07/12] code refactor --- broadcastclients/broadcastclients.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 3e048000e0..aa5f39ad90 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -253,13 +253,12 @@ func (bcs *BroadcastClients) stopSecondaryFeed() { log.Info("disconnected secondary feed", "url", bcs.secondaryURL[pos]) // flush the secondary feeds' message and confirmedSequenceNumber channels - f: for { select { case <-bcs.secondaryRouter.messageChan: case <-bcs.secondaryRouter.confirmedSequenceNumberChan: default: - break f + return } } } From 02b66733aa99c9894430353d221d71fecbca0f2d Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Thu, 30 Nov 2023 19:12:20 +0530 Subject: [PATCH 08/12] address PR comments --- broadcastclients/broadcastclients.go | 34 +++++++++++++++------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index aa5f39ad90..ed11ab87da 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -142,7 +142,6 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { defer primaryFeedIsDownTimer.Stop() msgHandler := func(msg broadcaster.BroadcastFeedMessage, router *Router) error { - startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok { return nil } @@ -156,7 +155,6 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { return nil } confSeqHandler := func(cs arbutil.MessageIndex, router *Router) { - startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) if cs == lastConfirmed { return } @@ -166,56 +164,60 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { } } - // Two select statements to prioritize reading messages from primary feeds' channels + // Multiple select statements to prioritize reading messages from primary feeds' channels and avoid starving of timers for { + select { + // Cycle buckets to get rid of old entries + case <-recentFeedItemsCleanup.C: + recentFeedItemsOld = recentFeedItemsNew + recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE) + // Primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed + case <-stopSecondaryFeedTimer.C: + bcs.stopSecondaryFeed() + default: + } + select { case <-ctx.Done(): return // Primary feeds case msg := <-bcs.primaryRouter.messageChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) if err := msgHandler(msg, bcs.primaryRouter); err != nil { log.Error("Error routing message from Primary Sequencer Feeds", "err", err) } case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) confSeqHandler(cs, bcs.primaryRouter) - // Cycle buckets to get rid of old entries - case <-recentFeedItemsCleanup.C: - recentFeedItemsOld = recentFeedItemsNew - recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE) - // Failed to get messages from both primary and secondary feeds for ~5 seconds, start a new secondary feed - case <-startSecondaryFeedTimer.C: - bcs.startSecondaryFeed(ctx) // Failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary case <-primaryFeedIsDownTimer.C: stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME) - // Primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed - case <-stopSecondaryFeedTimer.C: - bcs.stopSecondaryFeed() default: select { case <-ctx.Done(): return // Secondary Feeds case msg := <-bcs.secondaryRouter.messageChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) if err := msgHandler(msg, bcs.secondaryRouter); err != nil { log.Error("Error routing message from Secondary Sequencer Feeds", "err", err) } case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) confSeqHandler(cs, bcs.secondaryRouter) case msg := <-bcs.primaryRouter.messageChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) if err := msgHandler(msg, bcs.primaryRouter); err != nil { log.Error("Error routing message from Primary Sequencer Feeds", "err", err) } case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) confSeqHandler(cs, bcs.primaryRouter) - case <-recentFeedItemsCleanup.C: - recentFeedItemsOld = recentFeedItemsNew - recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE) case <-startSecondaryFeedTimer.C: bcs.startSecondaryFeed(ctx) case <-primaryFeedIsDownTimer.C: From 59f151b3fbf48b6c0fe03d60b3cf0adab3224235 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Thu, 30 Nov 2023 17:58:05 -0700 Subject: [PATCH 09/12] Update arbitrator OSP fuzzer to work with new OSP ABI --- arbitrator/prover/fuzz/fuzz_targets/osp.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/arbitrator/prover/fuzz/fuzz_targets/osp.rs b/arbitrator/prover/fuzz/fuzz_targets/osp.rs index 1ceef8355c..ddb5ebfdfd 100644 --- a/arbitrator/prover/fuzz/fuzz_targets/osp.rs +++ b/arbitrator/prover/fuzz/fuzz_targets/osp.rs @@ -39,10 +39,9 @@ fn get_contract_deployed_bytecode(contract: &str) -> Vec { lazy_static::lazy_static! { static ref OSP_PREFIX: Vec = { let mut data = Vec::new(); - data.extend(hex::decode("2fae8811").unwrap()); // function selector + data.extend(hex::decode("5d3adcfb").unwrap()); // function selector data.extend([0; 32]); // maxInboxMessagesRead - data.extend([0; 32]); // sequencerInbox - data.extend([0; 32]); // delayedInbox + data.extend([0; 32]); // bridge data }; static ref EVM_VICINITY: evm::backend::MemoryVicinity = { From aa59d344b37422beeb42a13ffb9f03527c069e09 Mon Sep 17 00:00:00 2001 From: Joshua Colvin Date: Fri, 1 Dec 2023 13:53:36 -0700 Subject: [PATCH 10/12] Fix chain parameter default Chain parameters were updated to prepare for execution split, but one mention was missed --- cmd/nitro/nitro.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index cee86ab40a..cf2155444a 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -839,7 +839,7 @@ func applyChainParameters(ctx context.Context, k *koanf.Koanf, chainId uint64, c } safeBatchSize := l2MaxTxSize - bufferSpace chainDefaults["node.batch-poster.max-size"] = safeBatchSize - chainDefaults["node.sequencer.max-tx-data-size"] = safeBatchSize - bufferSpace + chainDefaults["execution.sequencer.max-tx-data-size"] = safeBatchSize - bufferSpace } if chainInfo.DasIndexUrl != "" { chainDefaults["node.batch-poster.max-size"] = 1000000 From 1d691791f86329bd7cc16aab100efd33b2e14dfb Mon Sep 17 00:00:00 2001 From: Joshua Colvin Date: Fri, 1 Dec 2023 14:18:01 -0700 Subject: [PATCH 11/12] Use field name instead of variable name in warning message --- cmd/nitro/nitro.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index cee86ab40a..e0b6d326ac 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -800,7 +800,7 @@ func applyChainParameters(ctx context.Context, k *koanf.Koanf, chainId uint64, c if chainInfo.ParentChainIsArbitrum != nil { parentChainIsArbitrum = *chainInfo.ParentChainIsArbitrum } else { - log.Warn("Chain information parentChainIsArbitrum field missing, in the future this will be required", "chainId", chainInfo.ChainConfig.ChainID, "parentChainId", chainInfo.ParentChainId) + log.Warn("Chain info field parent-chain-is-arbitrum is missing, in the future this will be required", "chainId", chainInfo.ChainConfig.ChainID, "parentChainId", chainInfo.ParentChainId) _, err := chaininfo.ProcessChainInfo(chainInfo.ParentChainId, "", combinedL2ChainInfoFiles, "") if err == nil { parentChainIsArbitrum = true From 3eb4212a5657bb7bdb60602136c7989843d905fc Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Sat, 2 Dec 2023 12:04:30 -0700 Subject: [PATCH 12/12] Always initialize staker wallet --- arbnode/node.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/arbnode/node.go b/arbnode/node.go index e52a698e4e..c6e117e700 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -615,10 +615,8 @@ func createNodeImpl( if err != nil { return nil, err } - if stakerObj.Strategy() == staker.WatchtowerStrategy { - if err := wallet.Initialize(ctx); err != nil { - return nil, err - } + if err := wallet.Initialize(ctx); err != nil { + return nil, err } var txValidatorSenderPtr *common.Address if txOptsValidator != nil {