Skip to content

Commit

Permalink
Merge branch 'master' into validate-config-params
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli authored Dec 4, 2023
2 parents a378c57 + ddf1229 commit 1445f1a
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 102 deletions.
5 changes: 2 additions & 3 deletions arbitrator/prover/fuzz/fuzz_targets/osp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@ fn get_contract_deployed_bytecode(contract: &str) -> Vec<u8> {
lazy_static::lazy_static! {
static ref OSP_PREFIX: Vec<u8> = {
let mut data = Vec::new();
data.extend(hex::decode("2fae8811").unwrap()); // function selector
data.extend(hex::decode("5d3adcfb").unwrap()); // function selector
data.extend([0; 32]); // maxInboxMessagesRead
data.extend([0; 32]); // sequencerInbox
data.extend([0; 32]); // delayedInbox
data.extend([0; 32]); // bridge
data
};
static ref EVM_VICINITY: evm::backend::MemoryVicinity = {
Expand Down
64 changes: 36 additions & 28 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/redislock"
Expand Down Expand Up @@ -757,32 +756,30 @@ func (b *BatchPoster) encodeAddBatch(seqNum *big.Int, prevMsgNum arbutil.Message
return fullData, nil
}

func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, delayedMessages uint64) (uint64, error) {
func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, delayedMessages uint64, realData []byte, realNonce uint64) (uint64, error) {
config := b.config()
callOpts := &bind.CallOpts{
Context: ctx,
}
if config.DataPoster.WaitForL1Finality {
callOpts.BlockNumber = big.NewInt(int64(rpc.SafeBlockNumber))
}
safeDelayedMessagesBig, err := b.bridge.DelayedMessageCount(callOpts)
if err != nil {
return 0, fmt.Errorf("failed to get the confirmed delayed message count: %w", err)
}
if !safeDelayedMessagesBig.IsUint64() {
return 0, fmt.Errorf("calling delayedMessageCount() on the bridge returned a non-uint64 result %v", safeDelayedMessagesBig)
}
safeDelayedMessages := safeDelayedMessagesBig.Uint64()
if safeDelayedMessages > delayedMessages {
// On restart, we may be trying to estimate gas for a batch whose successor has
// already made it into pending state, if not latest state.
// In that case, we might get a revert with `DelayedBackwards()`.
// To avoid that, we artificially increase the delayed messages to `safeDelayedMessages`.
// In theory, this might reduce gas usage, but only by a factor that's already
// accounted for in `config.ExtraBatchGas`, as that same factor can appear if a user
// posts a new delayed message that we didn't see while gas estimating.
delayedMessages = safeDelayedMessages
useNormalEstimation := b.dataPoster.MaxMempoolTransactions() == 1
if !useNormalEstimation {
// Check if we can use normal estimation anyways because we're at the latest nonce
latestNonce, err := b.l1Reader.Client().NonceAt(ctx, b.dataPoster.Sender(), nil)
if err != nil {
return 0, err
}
useNormalEstimation = latestNonce == realNonce
}
if useNormalEstimation {
// If we're at the latest nonce, we can skip the special future tx estimate stuff
gas, err := b.l1Reader.Client().EstimateGas(ctx, ethereum.CallMsg{
From: b.dataPoster.Sender(),
To: &b.seqInboxAddr,
Data: realData,
})
if err != nil {
return 0, err
}
return gas + config.ExtraBatchGas, nil
}

// Here we set seqNum to MaxUint256, and prevMsgNum to 0, because it disables the smart contracts' consistency checks.
// However, we set nextMsgNum to 1 because it is necessary for a correct estimation for the final to be non-zero.
// Because we're likely estimating against older state, this might not be the actual next message,
Expand All @@ -805,7 +802,6 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte,
"error estimating gas for batch",
"err", err,
"delayedMessages", delayedMessages,
"safeDelayedMessages", safeDelayedMessages,
"sequencerMessageHeader", hex.EncodeToString(sequencerMessageHeader),
"sequencerMessageLen", len(sequencerMessage),
)
Expand Down Expand Up @@ -858,6 +854,11 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}
firstMsgTime := time.Unix(int64(firstMsg.Message.Header.Timestamp), 0)

lastPotentialMsg, err := b.streamer.GetMessage(msgCount - 1)
if err != nil {
return false, err
}

config := b.config()
forcePostBatch := time.Since(firstMsgTime) >= config.MaxDelay

Expand Down Expand Up @@ -1000,11 +1001,18 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}
}

gasLimit, err := b.estimateGas(ctx, sequencerMsg, b.building.segments.delayedMsg)
data, err := b.encodeAddBatch(new(big.Int).SetUint64(batchPosition.NextSeqNum), batchPosition.MessageCount, b.building.msgCount, sequencerMsg, b.building.segments.delayedMsg)
if err != nil {
return false, err
}
data, err := b.encodeAddBatch(new(big.Int).SetUint64(batchPosition.NextSeqNum), batchPosition.MessageCount, b.building.msgCount, sequencerMsg, b.building.segments.delayedMsg)
// On restart, we may be trying to estimate gas for a batch whose successor has
// already made it into pending state, if not latest state.
// In that case, we might get a revert with `DelayedBackwards()`.
// To avoid that, we artificially increase the delayed messages to `lastPotentialMsg.DelayedMessagesRead`.
// In theory, this might reduce gas usage, but only by a factor that's already
// accounted for in `config.ExtraBatchGas`, as that same factor can appear if a user
// posts a new delayed message that we didn't see while gas estimating.
gasLimit, err := b.estimateGas(ctx, sequencerMsg, lastPotentialMsg.DelayedMessagesRead, data, nonce)
if err != nil {
return false, err
}
Expand Down
20 changes: 15 additions & 5 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type DataPoster struct {
signer signerFn
redisLock AttemptLocker
config ConfigFetcher
usingNoOpStorage bool
replacementTimes []time.Duration
metadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error)

Expand Down Expand Up @@ -119,8 +120,9 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro
if err != nil {
return nil, err
}
useNoOpStorage := cfg.UseNoOpStorage
if opts.HeaderReader.IsParentChainArbitrum() && !cfg.UseNoOpStorage {
cfg.UseNoOpStorage = true
useNoOpStorage = true
log.Info("Disabling data poster storage, as parent chain appears to be an Arbitrum chain without a mempool")
}
encF := func() storage.EncoderDecoderInterface {
Expand All @@ -131,7 +133,7 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro
}
var queue QueueStorage
switch {
case cfg.UseNoOpStorage:
case useNoOpStorage:
queue = &noop.Storage{}
case opts.RedisClient != nil:
var err error
Expand All @@ -158,6 +160,7 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro
return opts.Auth.Signer(addr, tx)
},
config: opts.Config,
usingNoOpStorage: useNoOpStorage,
replacementTimes: replacementTimes,
metadataRetriever: opts.MetadataRetriever,
queue: queue,
Expand Down Expand Up @@ -252,6 +255,13 @@ func (p *DataPoster) Sender() common.Address {
return p.sender
}

func (p *DataPoster) MaxMempoolTransactions() uint64 {
if p.usingNoOpStorage {
return 1
}
return p.config().MaxMempoolTransactions
}

// Does basic check whether posting transaction with specified nonce would
// result in exceeding maximum queue length or maximum transactions in mempool.
func (p *DataPoster) canPostWithNonce(ctx context.Context, nextNonce uint64) error {
Expand Down Expand Up @@ -398,7 +408,7 @@ func (p *DataPoster) feeAndTipCaps(ctx context.Context, nonce uint64, gasLimit u

latestBalance := p.balance
balanceForTx := new(big.Int).Set(latestBalance)
if config.AllocateMempoolBalance && !config.UseNoOpStorage {
if config.AllocateMempoolBalance && !p.usingNoOpStorage {
// We reserve half the balance for the first transaction, and then split the remaining balance for all after that.
// With noop storage, we don't try to replace-by-fee, so we don't need to worry about this.
balanceForTx.Div(balanceForTx, common.Big2)
Expand Down Expand Up @@ -500,12 +510,12 @@ func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransacti
}
if err := p.client.SendTransaction(ctx, newTx.FullTx); err != nil {
if !strings.Contains(err.Error(), "already known") && !strings.Contains(err.Error(), "nonce too low") {
log.Warn("DataPoster failed to send transaction", "err", err, "nonce", newTx.FullTx.Nonce(), "feeCap", newTx.FullTx.GasFeeCap(), "tipCap", newTx.FullTx.GasTipCap())
log.Warn("DataPoster failed to send transaction", "err", err, "nonce", newTx.FullTx.Nonce(), "feeCap", newTx.FullTx.GasFeeCap(), "tipCap", newTx.FullTx.GasTipCap(), "gas", newTx.FullTx.Gas())
return err
}
log.Info("DataPoster transaction already known", "err", err, "nonce", newTx.FullTx.Nonce(), "hash", newTx.FullTx.Hash())
} else {
log.Info("DataPoster sent transaction", "nonce", newTx.FullTx.Nonce(), "hash", newTx.FullTx.Hash(), "feeCap", newTx.FullTx.GasFeeCap())
log.Info("DataPoster sent transaction", "nonce", newTx.FullTx.Nonce(), "hash", newTx.FullTx.Hash(), "feeCap", newTx.FullTx.GasFeeCap(), "tipCap", newTx.FullTx.GasTipCap(), "gas", newTx.FullTx.Gas())
}
newerTx := *newTx
newerTx.Sent = true
Expand Down
6 changes: 2 additions & 4 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,8 @@ func createNodeImpl(
if err != nil {
return nil, err
}
if stakerObj.Strategy() == staker.WatchtowerStrategy {
if err := wallet.Initialize(ctx); err != nil {
return nil, err
}
if err := wallet.Initialize(ctx); err != nil {
return nil, err
}
var txValidatorSenderPtr *common.Address
if txOptsValidator != nil {
Expand Down
130 changes: 77 additions & 53 deletions broadcastclients/broadcastclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,75 +140,89 @@ func (bcs *BroadcastClients) Start(ctx context.Context) {
defer startSecondaryFeedTimer.Stop()
defer stopSecondaryFeedTimer.Stop()
defer primaryFeedIsDownTimer.Stop()

msgHandler := func(msg broadcaster.BroadcastFeedMessage, router *Router) error {
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
return nil
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
return nil
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := router.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
return err
}
return nil
}
confSeqHandler := func(cs arbutil.MessageIndex, router *Router) {
if cs == lastConfirmed {
return
}
lastConfirmed = cs
if router.forwardConfirmationChan != nil {
router.forwardConfirmationChan <- cs
}
}

// Multiple select statements to prioritize reading messages from primary feeds' channels and avoid starving of timers
for {
select {
// Cycle buckets to get rid of old entries
case <-recentFeedItemsCleanup.C:
recentFeedItemsOld = recentFeedItemsNew
recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
// Primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed
case <-stopSecondaryFeedTimer.C:
bcs.stopSecondaryFeed()
default:
}

select {
case <-ctx.Done():
return

// Primary feeds
case msg := <-bcs.primaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := bcs.primaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
if err := msgHandler(msg, bcs.primaryRouter); err != nil {
log.Error("Error routing message from Primary Sequencer Feeds", "err", err)
}
case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if cs == lastConfirmed {
continue
}
lastConfirmed = cs
if bcs.primaryRouter.forwardConfirmationChan != nil {
bcs.primaryRouter.forwardConfirmationChan <- cs
}

// Secondary Feeds
case msg := <-bcs.secondaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := bcs.secondaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
log.Error("Error routing message from Secondary Sequencer Feeds", "err", err)
}
case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if cs == lastConfirmed {
continue
}
lastConfirmed = cs
if bcs.secondaryRouter.forwardConfirmationChan != nil {
bcs.secondaryRouter.forwardConfirmationChan <- cs
}

// Cycle buckets to get rid of old entries
case <-recentFeedItemsCleanup.C:
recentFeedItemsOld = recentFeedItemsNew
recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)

// failed to get messages from both primary and secondary feeds for ~5 seconds, start a new secondary feed
case <-startSecondaryFeedTimer.C:
bcs.startSecondaryFeed(ctx)

// failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary
confSeqHandler(cs, bcs.primaryRouter)
// Failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary
case <-primaryFeedIsDownTimer.C:
stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME)
default:
select {
case <-ctx.Done():
return
// Secondary Feeds
case msg := <-bcs.secondaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if err := msgHandler(msg, bcs.secondaryRouter); err != nil {
log.Error("Error routing message from Secondary Sequencer Feeds", "err", err)
}
case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
confSeqHandler(cs, bcs.secondaryRouter)

// primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed
case <-stopSecondaryFeedTimer.C:
bcs.stopSecondaryFeed()
case msg := <-bcs.primaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if err := msgHandler(msg, bcs.primaryRouter); err != nil {
log.Error("Error routing message from Primary Sequencer Feeds", "err", err)
}
case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
confSeqHandler(cs, bcs.primaryRouter)
case <-startSecondaryFeedTimer.C:
bcs.startSecondaryFeed(ctx)
case <-primaryFeedIsDownTimer.C:
stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME)
}
}
}
})
Expand Down Expand Up @@ -239,6 +253,16 @@ func (bcs *BroadcastClients) stopSecondaryFeed() {
bcs.secondaryClients[pos].StopAndWait()
bcs.secondaryClients = bcs.secondaryClients[:pos]
log.Info("disconnected secondary feed", "url", bcs.secondaryURL[pos])

// flush the secondary feeds' message and confirmedSequenceNumber channels
for {
select {
case <-bcs.secondaryRouter.messageChan:
case <-bcs.secondaryRouter.confirmedSequenceNumberChan:
default:
return
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions cmd/chaininfo/arbitrum_chain_info.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"parent-chain-id": 1,
"parent-chain-is-arbitrum": false,
"sequencer-url": "https://arb1-sequencer.arbitrum.io/rpc",
"secondary-forwarding-target": "https://arb1-sequencer-fallback-1.arbitrum.io/rpc,https://arb1-sequencer-fallback-2.arbitrum.io/rpc,https://arb1-sequencer-fallback-3.arbitrum.io/rpc,https://arb1-sequencer-fallback-4.arbitrum.io/rpc,https://arb1-sequencer-fallback-5.arbitrum.io/rpc",
"feed-url": "wss://arb1-feed.arbitrum.io/feed",
"secondary-feed-url": "wss://arb1-delayed-feed.arbitrum.io/feed,wss://arb1-feed-fallback-1.arbitrum.io/feed,wss://arb1-feed-fallback-2.arbitrum.io/feed,wss://arb1-feed-fallback-3.arbitrum.io/feed,wss://arb1-feed-fallback-4.arbitrum.io/feed,wss://arb1-feed-fallback-5.arbitrum.io/feed",
"has-genesis-state": true,
Expand Down
15 changes: 8 additions & 7 deletions cmd/chaininfo/chain_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ type ChainInfo struct {
ParentChainId uint64 `json:"parent-chain-id"`
ParentChainIsArbitrum *bool `json:"parent-chain-is-arbitrum"`
// This is the forwarding target to submit transactions to, called the sequencer URL for clarity
SequencerUrl string `json:"sequencer-url"`
FeedUrl string `json:"feed-url"`
SecondaryFeedUrl string `json:"secondary-feed-url"`
DasIndexUrl string `json:"das-index-url"`
HasGenesisState bool `json:"has-genesis-state"`
ChainConfig *params.ChainConfig `json:"chain-config"`
RollupAddresses *RollupAddresses `json:"rollup"`
SequencerUrl string `json:"sequencer-url"`
SecondaryForwardingTarget string `json:"secondary-forwarding-target"`
FeedUrl string `json:"feed-url"`
SecondaryFeedUrl string `json:"secondary-feed-url"`
DasIndexUrl string `json:"das-index-url"`
HasGenesisState bool `json:"has-genesis-state"`
ChainConfig *params.ChainConfig `json:"chain-config"`
RollupAddresses *RollupAddresses `json:"rollup"`
}

func GetChainConfig(chainId *big.Int, chainName string, genesisBlockNum uint64, l2ChainInfoFiles []string, l2ChainInfoJson string) (*params.ChainConfig, error) {
Expand Down
Loading

0 comments on commit 1445f1a

Please sign in to comment.