Skip to content

Commit

Permalink
Merge branch 'master' into metric-jit-wasm-memory
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuacolvin0 authored Jan 5, 2024
2 parents d0cd917 + 2142d5f commit ef9b15e
Show file tree
Hide file tree
Showing 43 changed files with 678 additions and 642 deletions.
32 changes: 0 additions & 32 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -307,38 +307,6 @@ contracts/test/prover/proofs/%.json: $(arbitrator_cases)/%.wasm $(arbitrator_pro
# strategic rules to minimize dependency building

.make/lint: $(DEP_PREDICATE) build-node-deps $(ORDER_ONLY_PREDICATE) .make
go run ./linter/recursivelock ./...
go run ./linter/comparesame ./...

# Disabled since we have a lot of use of math/rand package.
# We should probably move to crypto/rand at some point even though most of
# our uses doesn't seem to be security sensitive.
# TODO fix this and enable.
# go run ./linter/cryptorand ./...

# This yields lot of legitimate warnings, most of which in practice would
# probably never happen.
# # TODO fix this and enable.
# go run ./linter/errcheck ./...

go run ./linter/featureconfig ./...

# Disabled since we have high cognitive complexity several places.
# TODO fix this and enable.
# go run ./linter/gocognit ./...

go run ./linter/ineffassign ./...
go run ./linter/interfacechecker ./...
go run ./linter/logruswitherror ./...

go run ./linter/shadowpredecl ./...
go run ./linter/slicedirect ./...

# Disabled since it fails many places, although ones I looked into seem
# to be false positives logically.
# TODO fix this and enable and mark false positives with lint ignore.
# go run ./linter/uintcast ./...

go run ./linter/koanf ./...
go run ./linter/pointercheck ./...
golangci-lint run --fix
Expand Down
58 changes: 49 additions & 9 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/rlp"

"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbnode/redislock"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
Expand Down Expand Up @@ -286,7 +287,6 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
HeaderReader: opts.L1Reader,
Auth: opts.TransactOpts,
RedisClient: redisClient,
RedisLock: redisLock,
Config: dataPosterConfigFetcher,
MetadataRetriever: b.getBatchPosterPosition,
ExtraBacklog: b.GetBacklogEstimate,
Expand Down Expand Up @@ -770,6 +770,8 @@ func (b *BatchPoster) encodeAddBatch(seqNum *big.Int, prevMsgNum arbutil.Message
return fullData, nil
}

var ErrNormalGasEstimationFailed = errors.New("normal gas estimation failed")

func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, delayedMessages uint64, realData []byte, realNonce uint64, realAccessList types.AccessList) (uint64, error) {
config := b.config()
useNormalEstimation := b.dataPoster.MaxMempoolTransactions() == 1
Expand All @@ -790,7 +792,7 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte,
AccessList: realAccessList,
})
if err != nil {
return 0, err
return 0, fmt.Errorf("%w: %w", ErrNormalGasEstimationFailed, err)
}
return gas + config.ExtraBatchGas, nil
}
Expand Down Expand Up @@ -830,6 +832,8 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte,

const ethPosBlockTime = 12 * time.Second

var errAttemptLockFailed = errors.New("failed to acquire lock; either another batch poster posted a batch or this node fell behind")

func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) {
if b.batchReverted.Load() {
return false, fmt.Errorf("batch was reverted, not posting any more batches")
Expand Down Expand Up @@ -1006,6 +1010,18 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}

if b.daWriter != nil {
if !b.redisLock.AttemptLock(ctx) {
return false, errAttemptLockFailed
}

gotNonce, gotMeta, err := b.dataPoster.GetNextNonceAndMeta(ctx)
if err != nil {
return false, err
}
if nonce != gotNonce || !bytes.Equal(batchPositionBytes, gotMeta) {
return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce)
}

cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled
if errors.Is(err, das.BatchToDasFailed) {
if config.DisableDasFallbackStoreDataOnChain {
Expand Down Expand Up @@ -1147,25 +1163,49 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
batchPosterWalletBalance.Update(arbmath.BalancePerEther(walletBalance))
}
}
if !b.redisLock.AttemptLock(ctx) {
couldLock, err := b.redisLock.CouldAcquireLock(ctx)
if err != nil {
log.Warn("Error checking if we could acquire redis lock", "err", err)
// Might as well try, worst case we fail to lock
couldLock = true
}
if !couldLock {
log.Debug("Not posting batches right now because another batch poster has the lock or this node is behind")
b.building = nil
b.firstEphemeralError = time.Time{}
return b.config().PollInterval
}
posted, err := b.maybePostSequencerBatch(ctx)
if err == nil {
b.firstEphemeralError = time.Time{}
}
if err != nil {
if ctx.Err() != nil {
// Shutting down. No need to print the context canceled error.
return 0
}
b.building = nil
logLevel := log.Error
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
if b.firstEphemeralError == (time.Time{}) {
b.firstEphemeralError = time.Now()
logLevel = log.Warn
} else if time.Since(b.firstEphemeralError) < time.Minute {
logLevel = log.Warn
} else if time.Since(b.firstEphemeralError) < time.Minute*5 && strings.Contains(err.Error(), "will exceed max mempool size") {
}
// Likely the inbox tracker just isn't caught up, or there's some other ephemeral error.
// Let's see if this error disappears naturally.
sinceFirstEphemeralError := time.Since(b.firstEphemeralError)
// If the error matches one of these, it's only logged at debug for the first minute,
// then at warn for the next 4 minutes, then at error. If the error isn't one of these,
// it'll be logged at warn for the first minute, then at error.
ignoreAtFirst := errors.Is(err, dataposter.ErrExceedsMaxMempoolSize) ||
errors.Is(err, storage.ErrStorageRace) ||
errors.Is(err, ErrNormalGasEstimationFailed) ||
errors.Is(err, AccumulatorNotFoundErr)
if sinceFirstEphemeralError < time.Minute {
if ignoreAtFirst {
logLevel = log.Debug
} else {
logLevel = log.Warn
}
} else if sinceFirstEphemeralError < time.Minute*5 && ignoreAtFirst {
logLevel = log.Warn
}
logLevel("error posting batch", "err", err)
Expand Down
16 changes: 4 additions & 12 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type DataPoster struct {
client arbutil.L1Interface
auth *bind.TransactOpts
signer signerFn
redisLock AttemptLocker
config ConfigFetcher
usingNoOpStorage bool
replacementTimes []time.Duration
Expand All @@ -84,10 +83,6 @@ type DataPoster struct {
// This can be local or external, hence the context parameter.
type signerFn func(context.Context, common.Address, *types.Transaction) (*types.Transaction, error)

type AttemptLocker interface {
AttemptLock(context.Context) bool
}

func parseReplacementTimes(val string) ([]time.Duration, error) {
var res []time.Duration
var lastReplacementTime time.Duration
Expand All @@ -114,7 +109,6 @@ type DataPosterOpts struct {
HeaderReader *headerreader.HeaderReader
Auth *bind.TransactOpts
RedisClient redis.UniversalClient
RedisLock AttemptLocker
Config ConfigFetcher
MetadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error)
ExtraBacklog func() uint64
Expand Down Expand Up @@ -175,7 +169,6 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro
replacementTimes: replacementTimes,
metadataRetriever: opts.MetadataRetriever,
queue: queue,
redisLock: opts.RedisLock,
errorCount: make(map[uint64]int),
maxFeeCapExpression: expression,
extraBacklog: opts.ExtraBacklog,
Expand Down Expand Up @@ -288,6 +281,8 @@ func (p *DataPoster) MaxMempoolTransactions() uint64 {
return p.config().MaxMempoolTransactions
}

var ErrExceedsMaxMempoolSize = errors.New("posting this transaction will exceed max mempool size")

// Does basic check whether posting transaction with specified nonce would
// result in exceeding maximum queue length or maximum transactions in mempool.
func (p *DataPoster) canPostWithNonce(ctx context.Context, nextNonce uint64) error {
Expand All @@ -310,7 +305,7 @@ func (p *DataPoster) canPostWithNonce(ctx context.Context, nextNonce uint64) err
return fmt.Errorf("getting nonce of a dataposter sender: %w", err)
}
if nextNonce >= cfg.MaxMempoolTransactions+unconfirmedNonce {
return fmt.Errorf("posting a transaction with nonce: %d will exceed max mempool size: %d, unconfirmed nonce: %d", nextNonce, cfg.MaxMempoolTransactions, unconfirmedNonce)
return fmt.Errorf("%w: transaction nonce: %d, unconfirmed nonce: %d, max mempool size: %d", ErrExceedsMaxMempoolSize, nextNonce, unconfirmedNonce, cfg.MaxMempoolTransactions)
}
}
return nil
Expand Down Expand Up @@ -533,7 +528,7 @@ func (p *DataPoster) PostTransaction(ctx context.Context, dataCreatedAt time.Tim
return nil, err
}
if nonce != expectedNonce {
return nil, fmt.Errorf("data poster expected next transaction to have nonce %v but was requested to post transaction with nonce %v", expectedNonce, nonce)
return nil, fmt.Errorf("%w: data poster expected next transaction to have nonce %v but was requested to post transaction with nonce %v", storage.ErrStorageRace, expectedNonce, nonce)
}

err = p.updateBalance(ctx)
Expand Down Expand Up @@ -745,9 +740,6 @@ func (p *DataPoster) Start(ctxIn context.Context) {
p.CallIteratively(func(ctx context.Context) time.Duration {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.redisLock.AttemptLock(ctx) {
return minWait
}
err := p.updateBalance(ctx)
if err != nil {
log.Warn("failed to update tx poster balance", "err", err)
Expand Down
13 changes: 1 addition & 12 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbnode/redislock"
"github.com/offchainlabs/nitro/arbnode/resourcemanager"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
Expand Down Expand Up @@ -313,13 +312,6 @@ func StakerDataposter(
if err != nil {
return nil, fmt.Errorf("creating redis client from url: %w", err)
}
lockCfgFetcher := func() *redislock.SimpleCfg {
return &cfg.Staker.RedisLock
}
redisLock, err := redislock.NewSimple(redisC, lockCfgFetcher, func() bool { return syncMonitor.Synced() })
if err != nil {
return nil, err
}
dpCfg := func() *dataposter.DataPosterConfig {
return &cfg.Staker.DataPoster
}
Expand All @@ -335,7 +327,6 @@ func StakerDataposter(
HeaderReader: l1Reader,
Auth: transactOpts,
RedisClient: redisC,
RedisLock: redisLock,
Config: dpCfg,
MetadataRetriever: mdRetriever,
RedisKey: sender + ".staker-data-poster.queue",
Expand Down Expand Up @@ -740,8 +731,6 @@ func CreateNode(
}

func (n *Node) Start(ctx context.Context) error {
// config is the static config at start, not a dynamic config
config := n.configFetcher.Get()
execClient, ok := n.Execution.(*gethexec.ExecutionNode)
if !ok {
execClient = nil
Expand Down Expand Up @@ -773,7 +762,7 @@ func (n *Node) Start(ctx context.Context) error {
return fmt.Errorf("error initializing feed broadcast server: %w", err)
}
}
if n.InboxTracker != nil && n.BroadcastServer != nil && config.Sequencer {
if n.InboxTracker != nil && n.BroadcastServer != nil {
// Even if the sequencer coordinator will populate this backlog,
// we want to make sure it's populated before any clients connect.
err = n.InboxTracker.PopulateFeedBacklog(n.BroadcastServer)
Expand Down
26 changes: 25 additions & 1 deletion arbnode/redislock/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Simple struct {
}

type SimpleCfg struct {
Enable bool `koanf:"enable"`
MyId string `koanf:"my-id"`
LockoutDuration time.Duration `koanf:"lockout-duration" reload:"hot"`
RefreshDuration time.Duration `koanf:"refresh-duration" reload:"hot"`
Expand All @@ -39,6 +40,7 @@ type SimpleCfg struct {
type SimpleCfgFetcher func() *SimpleCfg

func AddConfigOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultCfg.Enable, "if false, always treat this as locked and don't write the lock to redis")
f.String(prefix+".my-id", "", "this node's id prefix when acquiring the lock (optional)")
f.Duration(prefix+".lockout-duration", DefaultCfg.LockoutDuration, "how long lock is held")
f.Duration(prefix+".refresh-duration", DefaultCfg.RefreshDuration, "how long between consecutive calls to redis")
Expand All @@ -60,6 +62,7 @@ func NewSimple(client redis.UniversalClient, config SimpleCfgFetcher, readyToLoc
}

var DefaultCfg = SimpleCfg{
Enable: true,
LockoutDuration: time.Minute,
RefreshDuration: time.Second * 10,
Key: "",
Expand Down Expand Up @@ -137,12 +140,33 @@ func (l *Simple) AttemptLock(ctx context.Context) bool {
}

func (l *Simple) Locked() bool {
if l.client == nil {
if l.client == nil || !l.config().Enable {
return true
}
return time.Now().Before(atomicTimeRead(&l.lockedUntil))
}

// Returns true if a call to AttemptLock will likely succeed
func (l *Simple) CouldAcquireLock(ctx context.Context) (bool, error) {
if l.Locked() {
return true, nil
}
if l.stopping || !l.readyToLock() {
return false, nil
}
// l.client shouldn't be nil here because Locked would've returned true
current, err := l.client.Get(ctx, l.config().Key).Result()
if errors.Is(err, redis.Nil) {
// Lock is free for the taking
return true, nil
}
if err != nil {
return false, err
}
// return true if the lock is free for the taking or is already ours
return current == "" || current == l.myId, nil
}

func (l *Simple) Release(ctx context.Context) {
l.mutex.Lock()
defer l.mutex.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,8 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
log.Warn("failed sequencing delayed messages after catching lock", "err", err)
}
}
// This should be redundant now that even non-primary sequencers broadcast over the feed,
// but the backlog efficiently deduplicates messages, so better safe than sorry.
err = c.streamer.PopulateFeedBacklog()
if err != nil {
log.Warn("failed to populate the feed backlog on lockout acquisition", "err", err)
Expand Down
1 change: 1 addition & 0 deletions arbnode/simple_redis_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func simpleRedisLockTest(t *testing.T, redisKeySuffix string, chosen int, backgo
Require(t, redisClient.Del(ctx, redisKey).Err())

conf := &redislock.SimpleCfg{
Enable: true,
LockoutDuration: test_delay * test_attempts * 10,
RefreshDuration: test_delay * 2,
Key: redisKey,
Expand Down
12 changes: 6 additions & 6 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,12 +862,6 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(pos arbutil.MessageIndex
return err
}

if s.broadcastServer != nil {
if err := s.broadcastServer.BroadcastSingle(msgWithMeta, pos); err != nil {
log.Error("failed broadcasting message", "pos", pos, "err", err)
}
}

return nil
}

Expand Down Expand Up @@ -927,6 +921,12 @@ func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages [
default:
}

if s.broadcastServer != nil {
if err := s.broadcastServer.BroadcastMessages(messages, pos); err != nil {
log.Error("failed broadcasting message", "pos", pos, "err", err)
}
}

return nil
}

Expand Down
Loading

0 comments on commit ef9b15e

Please sign in to comment.