diff --git a/arbitrator/prover/fuzz/fuzz_targets/osp.rs b/arbitrator/prover/fuzz/fuzz_targets/osp.rs index 1ceef8355c..ddb5ebfdfd 100644 --- a/arbitrator/prover/fuzz/fuzz_targets/osp.rs +++ b/arbitrator/prover/fuzz/fuzz_targets/osp.rs @@ -39,10 +39,9 @@ fn get_contract_deployed_bytecode(contract: &str) -> Vec { lazy_static::lazy_static! { static ref OSP_PREFIX: Vec = { let mut data = Vec::new(); - data.extend(hex::decode("2fae8811").unwrap()); // function selector + data.extend(hex::decode("5d3adcfb").unwrap()); // function selector data.extend([0; 32]); // maxInboxMessagesRead - data.extend([0; 32]); // sequencerInbox - data.extend([0; 32]); // delayedInbox + data.extend([0; 32]); // bridge data }; static ref EVM_VICINITY: evm::backend::MemoryVicinity = { diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index edfb5c35d2..d96ed57238 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbnode/dataposter" "github.com/offchainlabs/nitro/arbnode/redislock" @@ -757,32 +756,30 @@ func (b *BatchPoster) encodeAddBatch(seqNum *big.Int, prevMsgNum arbutil.Message return fullData, nil } -func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, delayedMessages uint64) (uint64, error) { +func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, delayedMessages uint64, realData []byte, realNonce uint64) (uint64, error) { config := b.config() - callOpts := &bind.CallOpts{ - Context: ctx, - } - if config.DataPoster.WaitForL1Finality { - callOpts.BlockNumber = big.NewInt(int64(rpc.SafeBlockNumber)) - } - safeDelayedMessagesBig, err := b.bridge.DelayedMessageCount(callOpts) - if err != nil { - return 0, fmt.Errorf("failed to get the confirmed delayed message count: %w", err) - } - if !safeDelayedMessagesBig.IsUint64() { - return 0, fmt.Errorf("calling delayedMessageCount() on the bridge returned a non-uint64 result %v", safeDelayedMessagesBig) - } - safeDelayedMessages := safeDelayedMessagesBig.Uint64() - if safeDelayedMessages > delayedMessages { - // On restart, we may be trying to estimate gas for a batch whose successor has - // already made it into pending state, if not latest state. - // In that case, we might get a revert with `DelayedBackwards()`. - // To avoid that, we artificially increase the delayed messages to `safeDelayedMessages`. - // In theory, this might reduce gas usage, but only by a factor that's already - // accounted for in `config.ExtraBatchGas`, as that same factor can appear if a user - // posts a new delayed message that we didn't see while gas estimating. - delayedMessages = safeDelayedMessages + useNormalEstimation := b.dataPoster.MaxMempoolTransactions() == 1 + if !useNormalEstimation { + // Check if we can use normal estimation anyways because we're at the latest nonce + latestNonce, err := b.l1Reader.Client().NonceAt(ctx, b.dataPoster.Sender(), nil) + if err != nil { + return 0, err + } + useNormalEstimation = latestNonce == realNonce + } + if useNormalEstimation { + // If we're at the latest nonce, we can skip the special future tx estimate stuff + gas, err := b.l1Reader.Client().EstimateGas(ctx, ethereum.CallMsg{ + From: b.dataPoster.Sender(), + To: &b.seqInboxAddr, + Data: realData, + }) + if err != nil { + return 0, err + } + return gas + config.ExtraBatchGas, nil } + // Here we set seqNum to MaxUint256, and prevMsgNum to 0, because it disables the smart contracts' consistency checks. // However, we set nextMsgNum to 1 because it is necessary for a correct estimation for the final to be non-zero. // Because we're likely estimating against older state, this might not be the actual next message, @@ -805,7 +802,6 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, "error estimating gas for batch", "err", err, "delayedMessages", delayedMessages, - "safeDelayedMessages", safeDelayedMessages, "sequencerMessageHeader", hex.EncodeToString(sequencerMessageHeader), "sequencerMessageLen", len(sequencerMessage), ) @@ -858,6 +854,11 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) } firstMsgTime := time.Unix(int64(firstMsg.Message.Header.Timestamp), 0) + lastPotentialMsg, err := b.streamer.GetMessage(msgCount - 1) + if err != nil { + return false, err + } + config := b.config() forcePostBatch := time.Since(firstMsgTime) >= config.MaxDelay @@ -1000,11 +1001,18 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) } } - gasLimit, err := b.estimateGas(ctx, sequencerMsg, b.building.segments.delayedMsg) + data, err := b.encodeAddBatch(new(big.Int).SetUint64(batchPosition.NextSeqNum), batchPosition.MessageCount, b.building.msgCount, sequencerMsg, b.building.segments.delayedMsg) if err != nil { return false, err } - data, err := b.encodeAddBatch(new(big.Int).SetUint64(batchPosition.NextSeqNum), batchPosition.MessageCount, b.building.msgCount, sequencerMsg, b.building.segments.delayedMsg) + // On restart, we may be trying to estimate gas for a batch whose successor has + // already made it into pending state, if not latest state. + // In that case, we might get a revert with `DelayedBackwards()`. + // To avoid that, we artificially increase the delayed messages to `lastPotentialMsg.DelayedMessagesRead`. + // In theory, this might reduce gas usage, but only by a factor that's already + // accounted for in `config.ExtraBatchGas`, as that same factor can appear if a user + // posts a new delayed message that we didn't see while gas estimating. + gasLimit, err := b.estimateGas(ctx, sequencerMsg, lastPotentialMsg.DelayedMessagesRead, data, nonce) if err != nil { return false, err } diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index b5be06af56..266131a6b9 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -56,6 +56,7 @@ type DataPoster struct { signer signerFn redisLock AttemptLocker config ConfigFetcher + usingNoOpStorage bool replacementTimes []time.Duration metadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error) @@ -119,8 +120,9 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro if err != nil { return nil, err } + useNoOpStorage := cfg.UseNoOpStorage if opts.HeaderReader.IsParentChainArbitrum() && !cfg.UseNoOpStorage { - cfg.UseNoOpStorage = true + useNoOpStorage = true log.Info("Disabling data poster storage, as parent chain appears to be an Arbitrum chain without a mempool") } encF := func() storage.EncoderDecoderInterface { @@ -131,7 +133,7 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro } var queue QueueStorage switch { - case cfg.UseNoOpStorage: + case useNoOpStorage: queue = &noop.Storage{} case opts.RedisClient != nil: var err error @@ -158,6 +160,7 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro return opts.Auth.Signer(addr, tx) }, config: opts.Config, + usingNoOpStorage: useNoOpStorage, replacementTimes: replacementTimes, metadataRetriever: opts.MetadataRetriever, queue: queue, @@ -252,6 +255,13 @@ func (p *DataPoster) Sender() common.Address { return p.sender } +func (p *DataPoster) MaxMempoolTransactions() uint64 { + if p.usingNoOpStorage { + return 1 + } + return p.config().MaxMempoolTransactions +} + // Does basic check whether posting transaction with specified nonce would // result in exceeding maximum queue length or maximum transactions in mempool. func (p *DataPoster) canPostWithNonce(ctx context.Context, nextNonce uint64) error { @@ -398,7 +408,7 @@ func (p *DataPoster) feeAndTipCaps(ctx context.Context, nonce uint64, gasLimit u latestBalance := p.balance balanceForTx := new(big.Int).Set(latestBalance) - if config.AllocateMempoolBalance && !config.UseNoOpStorage { + if config.AllocateMempoolBalance && !p.usingNoOpStorage { // We reserve half the balance for the first transaction, and then split the remaining balance for all after that. // With noop storage, we don't try to replace-by-fee, so we don't need to worry about this. balanceForTx.Div(balanceForTx, common.Big2) @@ -500,12 +510,12 @@ func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransacti } if err := p.client.SendTransaction(ctx, newTx.FullTx); err != nil { if !strings.Contains(err.Error(), "already known") && !strings.Contains(err.Error(), "nonce too low") { - log.Warn("DataPoster failed to send transaction", "err", err, "nonce", newTx.FullTx.Nonce(), "feeCap", newTx.FullTx.GasFeeCap(), "tipCap", newTx.FullTx.GasTipCap()) + log.Warn("DataPoster failed to send transaction", "err", err, "nonce", newTx.FullTx.Nonce(), "feeCap", newTx.FullTx.GasFeeCap(), "tipCap", newTx.FullTx.GasTipCap(), "gas", newTx.FullTx.Gas()) return err } log.Info("DataPoster transaction already known", "err", err, "nonce", newTx.FullTx.Nonce(), "hash", newTx.FullTx.Hash()) } else { - log.Info("DataPoster sent transaction", "nonce", newTx.FullTx.Nonce(), "hash", newTx.FullTx.Hash(), "feeCap", newTx.FullTx.GasFeeCap()) + log.Info("DataPoster sent transaction", "nonce", newTx.FullTx.Nonce(), "hash", newTx.FullTx.Hash(), "feeCap", newTx.FullTx.GasFeeCap(), "tipCap", newTx.FullTx.GasTipCap(), "gas", newTx.FullTx.Gas()) } newerTx := *newTx newerTx.Sent = true diff --git a/arbnode/node.go b/arbnode/node.go index e52a698e4e..c6e117e700 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -615,10 +615,8 @@ func createNodeImpl( if err != nil { return nil, err } - if stakerObj.Strategy() == staker.WatchtowerStrategy { - if err := wallet.Initialize(ctx); err != nil { - return nil, err - } + if err := wallet.Initialize(ctx); err != nil { + return nil, err } var txValidatorSenderPtr *common.Address if txOptsValidator != nil { diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index b2824221ea..ed11ab87da 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -140,75 +140,89 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { defer startSecondaryFeedTimer.Stop() defer stopSecondaryFeedTimer.Stop() defer primaryFeedIsDownTimer.Stop() + + msgHandler := func(msg broadcaster.BroadcastFeedMessage, router *Router) error { + if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok { + return nil + } + if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok { + return nil + } + recentFeedItemsNew[msg.SequenceNumber] = time.Now() + if err := router.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { + return err + } + return nil + } + confSeqHandler := func(cs arbutil.MessageIndex, router *Router) { + if cs == lastConfirmed { + return + } + lastConfirmed = cs + if router.forwardConfirmationChan != nil { + router.forwardConfirmationChan <- cs + } + } + + // Multiple select statements to prioritize reading messages from primary feeds' channels and avoid starving of timers for { + select { + // Cycle buckets to get rid of old entries + case <-recentFeedItemsCleanup.C: + recentFeedItemsOld = recentFeedItemsNew + recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE) + // Primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed + case <-stopSecondaryFeedTimer.C: + bcs.stopSecondaryFeed() + default: + } + select { case <-ctx.Done(): return - // Primary feeds case msg := <-bcs.primaryRouter.messageChan: startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) - if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok { - continue - } - if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok { - continue - } - recentFeedItemsNew[msg.SequenceNumber] = time.Now() - if err := bcs.primaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { + if err := msgHandler(msg, bcs.primaryRouter); err != nil { log.Error("Error routing message from Primary Sequencer Feeds", "err", err) } case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan: startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) - if cs == lastConfirmed { - continue - } - lastConfirmed = cs - if bcs.primaryRouter.forwardConfirmationChan != nil { - bcs.primaryRouter.forwardConfirmationChan <- cs - } - - // Secondary Feeds - case msg := <-bcs.secondaryRouter.messageChan: - startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) - if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok { - continue - } - if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok { - continue - } - recentFeedItemsNew[msg.SequenceNumber] = time.Now() - if err := bcs.secondaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { - log.Error("Error routing message from Secondary Sequencer Feeds", "err", err) - } - case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan: - startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) - if cs == lastConfirmed { - continue - } - lastConfirmed = cs - if bcs.secondaryRouter.forwardConfirmationChan != nil { - bcs.secondaryRouter.forwardConfirmationChan <- cs - } - - // Cycle buckets to get rid of old entries - case <-recentFeedItemsCleanup.C: - recentFeedItemsOld = recentFeedItemsNew - recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE) - - // failed to get messages from both primary and secondary feeds for ~5 seconds, start a new secondary feed - case <-startSecondaryFeedTimer.C: - bcs.startSecondaryFeed(ctx) - - // failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary + confSeqHandler(cs, bcs.primaryRouter) + // Failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary case <-primaryFeedIsDownTimer.C: stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME) + default: + select { + case <-ctx.Done(): + return + // Secondary Feeds + case msg := <-bcs.secondaryRouter.messageChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + if err := msgHandler(msg, bcs.secondaryRouter); err != nil { + log.Error("Error routing message from Secondary Sequencer Feeds", "err", err) + } + case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + confSeqHandler(cs, bcs.secondaryRouter) - // primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed - case <-stopSecondaryFeedTimer.C: - bcs.stopSecondaryFeed() + case msg := <-bcs.primaryRouter.messageChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) + if err := msgHandler(msg, bcs.primaryRouter); err != nil { + log.Error("Error routing message from Primary Sequencer Feeds", "err", err) + } + case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) + confSeqHandler(cs, bcs.primaryRouter) + case <-startSecondaryFeedTimer.C: + bcs.startSecondaryFeed(ctx) + case <-primaryFeedIsDownTimer.C: + stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME) + } } } }) @@ -239,6 +253,16 @@ func (bcs *BroadcastClients) stopSecondaryFeed() { bcs.secondaryClients[pos].StopAndWait() bcs.secondaryClients = bcs.secondaryClients[:pos] log.Info("disconnected secondary feed", "url", bcs.secondaryURL[pos]) + + // flush the secondary feeds' message and confirmedSequenceNumber channels + for { + select { + case <-bcs.secondaryRouter.messageChan: + case <-bcs.secondaryRouter.confirmedSequenceNumberChan: + default: + return + } + } } } diff --git a/cmd/chaininfo/arbitrum_chain_info.json b/cmd/chaininfo/arbitrum_chain_info.json index 421abea20a..31d25cfdf5 100644 --- a/cmd/chaininfo/arbitrum_chain_info.json +++ b/cmd/chaininfo/arbitrum_chain_info.json @@ -4,6 +4,7 @@ "parent-chain-id": 1, "parent-chain-is-arbitrum": false, "sequencer-url": "https://arb1-sequencer.arbitrum.io/rpc", + "secondary-forwarding-target": "https://arb1-sequencer-fallback-1.arbitrum.io/rpc,https://arb1-sequencer-fallback-2.arbitrum.io/rpc,https://arb1-sequencer-fallback-3.arbitrum.io/rpc,https://arb1-sequencer-fallback-4.arbitrum.io/rpc,https://arb1-sequencer-fallback-5.arbitrum.io/rpc", "feed-url": "wss://arb1-feed.arbitrum.io/feed", "secondary-feed-url": "wss://arb1-delayed-feed.arbitrum.io/feed,wss://arb1-feed-fallback-1.arbitrum.io/feed,wss://arb1-feed-fallback-2.arbitrum.io/feed,wss://arb1-feed-fallback-3.arbitrum.io/feed,wss://arb1-feed-fallback-4.arbitrum.io/feed,wss://arb1-feed-fallback-5.arbitrum.io/feed", "has-genesis-state": true, diff --git a/cmd/chaininfo/chain_info.go b/cmd/chaininfo/chain_info.go index fe499442d2..c0d6b3d005 100644 --- a/cmd/chaininfo/chain_info.go +++ b/cmd/chaininfo/chain_info.go @@ -22,13 +22,14 @@ type ChainInfo struct { ParentChainId uint64 `json:"parent-chain-id"` ParentChainIsArbitrum *bool `json:"parent-chain-is-arbitrum"` // This is the forwarding target to submit transactions to, called the sequencer URL for clarity - SequencerUrl string `json:"sequencer-url"` - FeedUrl string `json:"feed-url"` - SecondaryFeedUrl string `json:"secondary-feed-url"` - DasIndexUrl string `json:"das-index-url"` - HasGenesisState bool `json:"has-genesis-state"` - ChainConfig *params.ChainConfig `json:"chain-config"` - RollupAddresses *RollupAddresses `json:"rollup"` + SequencerUrl string `json:"sequencer-url"` + SecondaryForwardingTarget string `json:"secondary-forwarding-target"` + FeedUrl string `json:"feed-url"` + SecondaryFeedUrl string `json:"secondary-feed-url"` + DasIndexUrl string `json:"das-index-url"` + HasGenesisState bool `json:"has-genesis-state"` + ChainConfig *params.ChainConfig `json:"chain-config"` + RollupAddresses *RollupAddresses `json:"rollup"` } func GetChainConfig(chainId *big.Int, chainName string, genesisBlockNum uint64, l2ChainInfoFiles []string, l2ChainInfoJson string) (*params.ChainConfig, error) { diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index c113d08976..790cc874d3 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -839,7 +839,7 @@ func applyChainParameters(ctx context.Context, k *koanf.Koanf, chainId uint64, c if chainInfo.ParentChainIsArbitrum != nil { parentChainIsArbitrum = *chainInfo.ParentChainIsArbitrum } else { - log.Warn("Chain information parentChainIsArbitrum field missing, in the future this will be required", "chainId", chainInfo.ChainConfig.ChainID, "parentChainId", chainInfo.ParentChainId) + log.Warn("Chain info field parent-chain-is-arbitrum is missing, in the future this will be required", "chainId", chainInfo.ChainConfig.ChainID, "parentChainId", chainInfo.ParentChainId) _, err := chaininfo.ProcessChainInfo(chainInfo.ParentChainId, "", combinedL2ChainInfoFiles, "") if err == nil { parentChainIsArbitrum = true @@ -853,6 +853,9 @@ func applyChainParameters(ctx context.Context, k *koanf.Koanf, chainId uint64, c if chainInfo.SequencerUrl != "" { chainDefaults["execution.forwarding-target"] = chainInfo.SequencerUrl } + if chainInfo.SecondaryForwardingTarget != "" { + chainDefaults["execution.secondary-forwarding-target"] = strings.Split(chainInfo.SecondaryForwardingTarget, ",") + } if chainInfo.FeedUrl != "" { chainDefaults["node.feed.input.url"] = strings.Split(chainInfo.FeedUrl, ",") } @@ -875,7 +878,7 @@ func applyChainParameters(ctx context.Context, k *koanf.Koanf, chainId uint64, c } safeBatchSize := l2MaxTxSize - bufferSpace chainDefaults["node.batch-poster.max-size"] = safeBatchSize - chainDefaults["node.sequencer.max-tx-data-size"] = safeBatchSize - bufferSpace + chainDefaults["execution.sequencer.max-tx-data-size"] = safeBatchSize - bufferSpace } if chainInfo.DasIndexUrl != "" { chainDefaults["node.batch-poster.max-size"] = 1000000