diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index a54336fd5a..8c0fdd332a 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/offchainlabs/nitro/arbnode/dataposter" + "github.com/offchainlabs/nitro/arbnode/dataposter/storage" "github.com/offchainlabs/nitro/arbnode/redislock" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbstate" @@ -286,7 +287,6 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e HeaderReader: opts.L1Reader, Auth: opts.TransactOpts, RedisClient: redisClient, - RedisLock: redisLock, Config: dataPosterConfigFetcher, MetadataRetriever: b.getBatchPosterPosition, ExtraBacklog: b.GetBacklogEstimate, @@ -770,6 +770,8 @@ func (b *BatchPoster) encodeAddBatch(seqNum *big.Int, prevMsgNum arbutil.Message return fullData, nil } +var ErrNormalGasEstimationFailed = errors.New("normal gas estimation failed") + func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, delayedMessages uint64, realData []byte, realNonce uint64, realAccessList types.AccessList) (uint64, error) { config := b.config() useNormalEstimation := b.dataPoster.MaxMempoolTransactions() == 1 @@ -790,7 +792,7 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, AccessList: realAccessList, }) if err != nil { - return 0, err + return 0, fmt.Errorf("%w: %w", ErrNormalGasEstimationFailed, err) } return gas + config.ExtraBatchGas, nil } @@ -830,6 +832,8 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, const ethPosBlockTime = 12 * time.Second +var errAttemptLockFailed = errors.New("failed to acquire lock; either another batch poster posted a batch or this node fell behind") + func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) { if b.batchReverted.Load() { return false, fmt.Errorf("batch was reverted, not posting any more batches") @@ -1006,6 +1010,18 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) } if b.daWriter != nil { + if !b.redisLock.AttemptLock(ctx) { + return false, errAttemptLockFailed + } + + gotNonce, gotMeta, err := b.dataPoster.GetNextNonceAndMeta(ctx) + if err != nil { + return false, err + } + if nonce != gotNonce || !bytes.Equal(batchPositionBytes, gotMeta) { + return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce) + } + cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled if errors.Is(err, das.BatchToDasFailed) { if config.DisableDasFallbackStoreDataOnChain { @@ -1147,8 +1163,16 @@ func (b *BatchPoster) Start(ctxIn context.Context) { batchPosterWalletBalance.Update(arbmath.BalancePerEther(walletBalance)) } } - if !b.redisLock.AttemptLock(ctx) { + couldLock, err := b.redisLock.CouldAcquireLock(ctx) + if err != nil { + log.Warn("Error checking if we could acquire redis lock", "err", err) + // Might as well try, worst case we fail to lock + couldLock = true + } + if !couldLock { + log.Debug("Not posting batches right now because another batch poster has the lock or this node is behind") b.building = nil + b.firstEphemeralError = time.Time{} return b.config().PollInterval } posted, err := b.maybePostSequencerBatch(ctx) @@ -1156,16 +1180,32 @@ func (b *BatchPoster) Start(ctxIn context.Context) { b.firstEphemeralError = time.Time{} } if err != nil { + if ctx.Err() != nil { + // Shutting down. No need to print the context canceled error. + return 0 + } b.building = nil logLevel := log.Error - // Likely the inbox tracker just isn't caught up. - // Let's see if this error disappears naturally. if b.firstEphemeralError == (time.Time{}) { b.firstEphemeralError = time.Now() - logLevel = log.Warn - } else if time.Since(b.firstEphemeralError) < time.Minute { - logLevel = log.Warn - } else if time.Since(b.firstEphemeralError) < time.Minute*5 && strings.Contains(err.Error(), "will exceed max mempool size") { + } + // Likely the inbox tracker just isn't caught up, or there's some other ephemeral error. + // Let's see if this error disappears naturally. + sinceFirstEphemeralError := time.Since(b.firstEphemeralError) + // If the error matches one of these, it's only logged at debug for the first minute, + // then at warn for the next 4 minutes, then at error. If the error isn't one of these, + // it'll be logged at warn for the first minute, then at error. + ignoreAtFirst := errors.Is(err, dataposter.ErrExceedsMaxMempoolSize) || + errors.Is(err, storage.ErrStorageRace) || + errors.Is(err, ErrNormalGasEstimationFailed) || + errors.Is(err, AccumulatorNotFoundErr) + if sinceFirstEphemeralError < time.Minute { + if ignoreAtFirst { + logLevel = log.Debug + } else { + logLevel = log.Warn + } + } else if sinceFirstEphemeralError < time.Minute*5 && ignoreAtFirst { logLevel = log.Warn } logLevel("error posting batch", "err", err) diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index 059e080eea..ff5a945a17 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -57,7 +57,6 @@ type DataPoster struct { client arbutil.L1Interface auth *bind.TransactOpts signer signerFn - redisLock AttemptLocker config ConfigFetcher usingNoOpStorage bool replacementTimes []time.Duration @@ -84,10 +83,6 @@ type DataPoster struct { // This can be local or external, hence the context parameter. type signerFn func(context.Context, common.Address, *types.Transaction) (*types.Transaction, error) -type AttemptLocker interface { - AttemptLock(context.Context) bool -} - func parseReplacementTimes(val string) ([]time.Duration, error) { var res []time.Duration var lastReplacementTime time.Duration @@ -114,7 +109,6 @@ type DataPosterOpts struct { HeaderReader *headerreader.HeaderReader Auth *bind.TransactOpts RedisClient redis.UniversalClient - RedisLock AttemptLocker Config ConfigFetcher MetadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error) ExtraBacklog func() uint64 @@ -175,7 +169,6 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro replacementTimes: replacementTimes, metadataRetriever: opts.MetadataRetriever, queue: queue, - redisLock: opts.RedisLock, errorCount: make(map[uint64]int), maxFeeCapExpression: expression, extraBacklog: opts.ExtraBacklog, @@ -288,6 +281,8 @@ func (p *DataPoster) MaxMempoolTransactions() uint64 { return p.config().MaxMempoolTransactions } +var ErrExceedsMaxMempoolSize = errors.New("posting this transaction will exceed max mempool size") + // Does basic check whether posting transaction with specified nonce would // result in exceeding maximum queue length or maximum transactions in mempool. func (p *DataPoster) canPostWithNonce(ctx context.Context, nextNonce uint64) error { @@ -310,7 +305,7 @@ func (p *DataPoster) canPostWithNonce(ctx context.Context, nextNonce uint64) err return fmt.Errorf("getting nonce of a dataposter sender: %w", err) } if nextNonce >= cfg.MaxMempoolTransactions+unconfirmedNonce { - return fmt.Errorf("posting a transaction with nonce: %d will exceed max mempool size: %d, unconfirmed nonce: %d", nextNonce, cfg.MaxMempoolTransactions, unconfirmedNonce) + return fmt.Errorf("%w: transaction nonce: %d, unconfirmed nonce: %d, max mempool size: %d", ErrExceedsMaxMempoolSize, nextNonce, unconfirmedNonce, cfg.MaxMempoolTransactions) } } return nil @@ -533,7 +528,7 @@ func (p *DataPoster) PostTransaction(ctx context.Context, dataCreatedAt time.Tim return nil, err } if nonce != expectedNonce { - return nil, fmt.Errorf("data poster expected next transaction to have nonce %v but was requested to post transaction with nonce %v", expectedNonce, nonce) + return nil, fmt.Errorf("%w: data poster expected next transaction to have nonce %v but was requested to post transaction with nonce %v", storage.ErrStorageRace, expectedNonce, nonce) } err = p.updateBalance(ctx) @@ -745,9 +740,6 @@ func (p *DataPoster) Start(ctxIn context.Context) { p.CallIteratively(func(ctx context.Context) time.Duration { p.mutex.Lock() defer p.mutex.Unlock() - if !p.redisLock.AttemptLock(ctx) { - return minWait - } err := p.updateBalance(ctx) if err != nil { log.Warn("failed to update tx poster balance", "err", err) diff --git a/arbnode/node.go b/arbnode/node.go index 8be2a982cf..accca10e47 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbnode/dataposter" "github.com/offchainlabs/nitro/arbnode/dataposter/storage" - "github.com/offchainlabs/nitro/arbnode/redislock" "github.com/offchainlabs/nitro/arbnode/resourcemanager" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/broadcastclient" @@ -313,13 +312,6 @@ func StakerDataposter( if err != nil { return nil, fmt.Errorf("creating redis client from url: %w", err) } - lockCfgFetcher := func() *redislock.SimpleCfg { - return &cfg.Staker.RedisLock - } - redisLock, err := redislock.NewSimple(redisC, lockCfgFetcher, func() bool { return syncMonitor.Synced() }) - if err != nil { - return nil, err - } dpCfg := func() *dataposter.DataPosterConfig { return &cfg.Staker.DataPoster } @@ -335,7 +327,6 @@ func StakerDataposter( HeaderReader: l1Reader, Auth: transactOpts, RedisClient: redisC, - RedisLock: redisLock, Config: dpCfg, MetadataRetriever: mdRetriever, RedisKey: sender + ".staker-data-poster.queue", diff --git a/arbnode/redislock/redis.go b/arbnode/redislock/redis.go index c8252e059f..09296e3c79 100644 --- a/arbnode/redislock/redis.go +++ b/arbnode/redislock/redis.go @@ -29,6 +29,7 @@ type Simple struct { } type SimpleCfg struct { + Enable bool `koanf:"enable"` MyId string `koanf:"my-id"` LockoutDuration time.Duration `koanf:"lockout-duration" reload:"hot"` RefreshDuration time.Duration `koanf:"refresh-duration" reload:"hot"` @@ -39,6 +40,7 @@ type SimpleCfg struct { type SimpleCfgFetcher func() *SimpleCfg func AddConfigOptions(prefix string, f *flag.FlagSet) { + f.Bool(prefix+".enable", DefaultCfg.Enable, "if false, always treat this as locked and don't write the lock to redis") f.String(prefix+".my-id", "", "this node's id prefix when acquiring the lock (optional)") f.Duration(prefix+".lockout-duration", DefaultCfg.LockoutDuration, "how long lock is held") f.Duration(prefix+".refresh-duration", DefaultCfg.RefreshDuration, "how long between consecutive calls to redis") @@ -60,6 +62,7 @@ func NewSimple(client redis.UniversalClient, config SimpleCfgFetcher, readyToLoc } var DefaultCfg = SimpleCfg{ + Enable: true, LockoutDuration: time.Minute, RefreshDuration: time.Second * 10, Key: "", @@ -137,12 +140,33 @@ func (l *Simple) AttemptLock(ctx context.Context) bool { } func (l *Simple) Locked() bool { - if l.client == nil { + if l.client == nil || !l.config().Enable { return true } return time.Now().Before(atomicTimeRead(&l.lockedUntil)) } +// Returns true if a call to AttemptLock will likely succeed +func (l *Simple) CouldAcquireLock(ctx context.Context) (bool, error) { + if l.Locked() { + return true, nil + } + if l.stopping || !l.readyToLock() { + return false, nil + } + // l.client shouldn't be nil here because Locked would've returned true + current, err := l.client.Get(ctx, l.config().Key).Result() + if errors.Is(err, redis.Nil) { + // Lock is free for the taking + return true, nil + } + if err != nil { + return false, err + } + // return true if the lock is free for the taking or is already ours + return current == "" || current == l.myId, nil +} + func (l *Simple) Release(ctx context.Context) { l.mutex.Lock() defer l.mutex.Unlock() diff --git a/arbnode/simple_redis_lock_test.go b/arbnode/simple_redis_lock_test.go index b7506145c3..c9dd576749 100644 --- a/arbnode/simple_redis_lock_test.go +++ b/arbnode/simple_redis_lock_test.go @@ -48,6 +48,7 @@ func simpleRedisLockTest(t *testing.T, redisKeySuffix string, chosen int, backgo Require(t, redisClient.Del(ctx, redisKey).Err()) conf := &redislock.SimpleCfg{ + Enable: true, LockoutDuration: test_delay * test_attempts * 10, RefreshDuration: test_delay * 2, Key: redisKey, diff --git a/arbos/block_processor.go b/arbos/block_processor.go index 4864a31255..f1838132a8 100644 --- a/arbos/block_processor.go +++ b/arbos/block_processor.go @@ -42,6 +42,21 @@ var EmitReedeemScheduledEvent func(*vm.EVM, uint64, uint64, [32]byte, [32]byte, var EmitTicketCreatedEvent func(*vm.EVM, [32]byte) error var gasUsedSinceStartupCounter = metrics.NewRegisteredCounter("arb/gas_used", nil) +// A helper struct that implements String() by marshalling to JSON. +// This is useful for logging because it's lazy, so if the log level is too high to print the transaction, +// it doesn't waste compute marshalling the transaction when the result wouldn't be used. +type printTxAsJson struct { + tx *types.Transaction +} + +func (p printTxAsJson) String() string { + json, err := p.tx.MarshalJSON() + if err != nil { + return fmt.Sprintf("[error marshalling tx: %v]", err) + } + return string(json) +} + type L1Info struct { poster common.Address l1BlockNumber uint64 @@ -357,7 +372,11 @@ func ProduceBlockAdvanced( hooks.TxErrors = append(hooks.TxErrors, err) if err != nil { - log.Debug("error applying transaction", "tx", tx, "err", err) + logLevel := log.Debug + if chainConfig.DebugMode() { + logLevel = log.Warn + } + logLevel("error applying transaction", "tx", printTxAsJson{tx}, "err", err) if !hooks.DiscardInvalidTxsEarly { // we'll still deduct a TxGas's worth from the block-local rate limiter even if the tx was invalid blockGasLeft = arbmath.SaturatingUSub(blockGasLeft, params.TxGas) diff --git a/staker/staker.go b/staker/staker.go index 4f35c1bc9a..5fa6c400c3 100644 --- a/staker/staker.go +++ b/staker/staker.go @@ -21,7 +21,6 @@ import ( flag "github.com/spf13/pflag" "github.com/offchainlabs/nitro/arbnode/dataposter" - "github.com/offchainlabs/nitro/arbnode/redislock" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/cmd/genericconf" "github.com/offchainlabs/nitro/staker/txbuilder" @@ -87,7 +86,6 @@ type L1ValidatorConfig struct { GasRefunderAddress string `koanf:"gas-refunder-address"` DataPoster dataposter.DataPosterConfig `koanf:"data-poster" reload:"hot"` RedisUrl string `koanf:"redis-url"` - RedisLock redislock.SimpleCfg `koanf:"redis-lock" reload:"hot"` ExtraGas uint64 `koanf:"extra-gas" reload:"hot"` Dangerous DangerousConfig `koanf:"dangerous"` ParentChainWallet genericconf.WalletConfig `koanf:"parent-chain-wallet"` @@ -154,7 +152,6 @@ var DefaultL1ValidatorConfig = L1ValidatorConfig{ GasRefunderAddress: "", DataPoster: dataposter.DefaultDataPosterConfigForValidator, RedisUrl: "", - RedisLock: redislock.DefaultCfg, ExtraGas: 50000, Dangerous: DefaultDangerousConfig, ParentChainWallet: DefaultValidatorL1WalletConfig, @@ -175,7 +172,6 @@ var TestL1ValidatorConfig = L1ValidatorConfig{ GasRefunderAddress: "", DataPoster: dataposter.TestDataPosterConfigForValidator, RedisUrl: "", - RedisLock: redislock.DefaultCfg, ExtraGas: 50000, Dangerous: DefaultDangerousConfig, ParentChainWallet: DefaultValidatorL1WalletConfig, @@ -205,7 +201,6 @@ func L1ValidatorConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".redis-url", DefaultL1ValidatorConfig.RedisUrl, "redis url for L1 validator") f.Uint64(prefix+".extra-gas", DefaultL1ValidatorConfig.ExtraGas, "use this much more gas than estimation says is necessary to post transactions") dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f, dataposter.DefaultDataPosterConfigForValidator) - redislock.AddConfigOptions(prefix+".redis-lock", f) DangerousConfigAddOptions(prefix+".dangerous", f) genericconf.WalletConfigAddOptions(prefix+".parent-chain-wallet", f, DefaultL1ValidatorConfig.ParentChainWallet.Pathname) }