diff --git a/arbitrator/jit/src/syscall.rs b/arbitrator/jit/src/syscall.rs index 4cd0363b49..c81641a7f8 100644 --- a/arbitrator/jit/src/syscall.rs +++ b/arbitrator/jit/src/syscall.rs @@ -306,10 +306,10 @@ pub fn js_value_index(mut env: WasmEnvMut, sp: u32) { pub fn js_value_call(mut env: WasmEnvMut, sp: u32) -> MaybeEscape { let Some(resume) = env.data().exports.resume.clone() else { - return Escape::failure(format!("wasmer failed to bind {}", "resume".red())) + return Escape::failure(format!("wasmer failed to bind {}", "resume".red())); }; let Some(get_stack_pointer) = env.data().exports.get_stack_pointer.clone() else { - return Escape::failure(format!("wasmer failed to bind {}", "getsp".red())) + return Escape::failure(format!("wasmer failed to bind {}", "getsp".red())); }; let sp = GoStack::simple(sp, &env); let data = env.data_mut(); diff --git a/arbitrator/prover/src/machine.rs b/arbitrator/prover/src/machine.rs index fff9c0f3d8..d5a9c52d92 100644 --- a/arbitrator/prover/src/machine.rs +++ b/arbitrator/prover/src/machine.rs @@ -651,7 +651,7 @@ pub struct MachineState<'a> { initial_hash: Bytes32, } -pub type PreimageResolver = Arc Option>; +pub type PreimageResolver = Arc Option + Send + Sync>; /// Wraps a preimage resolver to provide an easier API /// and cache the last preimage retrieved. diff --git a/arbitrator/prover/src/utils.rs b/arbitrator/prover/src/utils.rs index 6c11e9af05..e86ea96768 100644 --- a/arbitrator/prover/src/utils.rs +++ b/arbitrator/prover/src/utils.rs @@ -158,6 +158,13 @@ impl From<&[u8]> for CBytes { } } +// There's no thread safety concerns for CBytes. +// This type is basically a Box<[u8]> (which is Send + Sync) with libc as an allocator. +// Any data races between threads are prevented by Rust borrowing rules, +// and the data isn't thread-local so there's no concern moving it between threads. +unsafe impl Send for CBytes {} +unsafe impl Sync for CBytes {} + #[derive(Serialize, Deserialize)] #[serde(remote = "Type")] enum RemoteType { diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index e9a1663741..8a69bf13ef 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -56,20 +56,20 @@ type batchPosterPosition struct { type BatchPoster struct { stopwaiter.StopWaiter - l1Reader *headerreader.HeaderReader - inbox *InboxTracker - streamer *TransactionStreamer - config BatchPosterConfigFetcher - seqInbox *bridgegen.SequencerInbox - bridge *bridgegen.Bridge - syncMonitor *SyncMonitor - seqInboxABI *abi.ABI - seqInboxAddr common.Address - building *buildingBatch - daWriter das.DataAvailabilityServiceWriter - dataPoster *dataposter.DataPoster - redisLock *redislock.Simple - firstAccErr time.Time // first time a continuous missing accumulator occurred + l1Reader *headerreader.HeaderReader + inbox *InboxTracker + streamer *TransactionStreamer + config BatchPosterConfigFetcher + seqInbox *bridgegen.SequencerInbox + bridge *bridgegen.Bridge + syncMonitor *SyncMonitor + seqInboxABI *abi.ABI + seqInboxAddr common.Address + building *buildingBatch + daWriter das.DataAvailabilityServiceWriter + dataPoster *dataposter.DataPoster + redisLock *redislock.Simple + firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred // An estimate of the number of batches we want to post but haven't yet. // This doesn't include batches which we don't want to post yet due to the L1 bounds. backlog uint64 @@ -309,8 +309,8 @@ func (b *BatchPoster) pollForReverts(ctx context.Context) { // - polling is through context, or // - we see a transaction in the block from dataposter that was reverted. select { - case h, closed := <-headerCh: - if closed { + case h, ok := <-headerCh: + if !ok { log.Info("L1 headers channel has been closed") return } @@ -787,7 +787,8 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) if err != nil { return false, err } - blockNumberWithPadding := arbmath.SaturatingUAdd(arbmath.BigToUintSaturating(latestHeader.Number), uint64(config.L1BlockBoundBypass/ethPosBlockTime)) + latestBlockNumber := arbutil.ParentHeaderToL1BlockNumber(latestHeader) + blockNumberWithPadding := arbmath.SaturatingUAdd(latestBlockNumber, uint64(config.L1BlockBoundBypass/ethPosBlockTime)) timestampWithPadding := arbmath.SaturatingUAdd(latestHeader.Time, uint64(config.L1BlockBoundBypass/time.Second)) l1BoundMinBlockNumber = arbmath.SaturatingUSub(blockNumberWithPadding, arbmath.BigToUintSaturating(maxTimeVariation.DelayBlocks)) @@ -969,20 +970,22 @@ func (b *BatchPoster) Start(ctxIn context.Context) { return b.config().PollInterval } posted, err := b.maybePostSequencerBatch(ctx) + ephemeralError := errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace) + if !ephemeralError { + b.firstEphemeralError = time.Time{} + } if err != nil { b.building = nil logLevel := log.Error - if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace) { + if ephemeralError { // Likely the inbox tracker just isn't caught up. // Let's see if this error disappears naturally. - if b.firstAccErr == (time.Time{}) { - b.firstAccErr = time.Now() + if b.firstEphemeralError == (time.Time{}) { + b.firstEphemeralError = time.Now() logLevel = log.Debug - } else if time.Since(b.firstAccErr) < time.Minute { + } else if time.Since(b.firstEphemeralError) < time.Minute { logLevel = log.Debug } - } else { - b.firstAccErr = time.Time{} } logLevel("error posting batch", "err", err) return b.config().ErrorDelay diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index b1db655d71..dff2602cac 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -101,17 +101,23 @@ func NewDataPoster(db ethdb.Database, headerReader *headerreader.HeaderReader, a initConfig.UseNoOpStorage = true log.Info("Disabling data poster storage, as parent chain appears to be an Arbitrum chain without a mempool") } + encF := func() storage.EncoderDecoderInterface { + if config().LegacyStorageEncoding { + return &storage.LegacyEncoderDecoder{} + } + return &storage.EncoderDecoder{} + } var queue QueueStorage switch { case initConfig.UseNoOpStorage: queue = &noop.Storage{} case initConfig.UseLevelDB: - queue = leveldb.New(db) + queue = leveldb.New(db, func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} }) case redisClient == nil: - queue = slice.NewStorage() + queue = slice.NewStorage(func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} }) default: var err error - queue, err = redisstorage.NewStorage(redisClient, "data-poster.queue", &initConfig.RedisSigner) + queue, err = redisstorage.NewStorage(redisClient, "data-poster.queue", &initConfig.RedisSigner, encF) if err != nil { return nil, err } @@ -162,11 +168,14 @@ func (p *DataPoster) canPostWithNonce(ctx context.Context, nextNonce uint64) err return nil } +func (p *DataPoster) waitForL1Finality() bool { + return p.config().WaitForL1Finality && !p.headerReader.IsParentChainArbitrum() +} + // Requires the caller hold the mutex. // Returns the next nonce, its metadata if stored, a bool indicating if the metadata is present, and an error. // Unlike GetNextNonceAndMeta, this does not call the metadataRetriever if the metadata is not stored in the queue. func (p *DataPoster) getNextNonceAndMaybeMeta(ctx context.Context) (uint64, []byte, bool, error) { - config := p.config() // Ensure latest finalized block state is available. blockNum, err := p.client.BlockNumber(ctx) if err != nil { @@ -174,7 +183,7 @@ func (p *DataPoster) getNextNonceAndMaybeMeta(ctx context.Context) (uint64, []by } lastQueueItem, err := p.queue.FetchLast(ctx) if err != nil { - return 0, nil, false, err + return 0, nil, false, fmt.Errorf("fetching last element from queue: %w", err) } if lastQueueItem != nil { nextNonce := lastQueueItem.Data.Nonce + 1 @@ -185,7 +194,7 @@ func (p *DataPoster) getNextNonceAndMaybeMeta(ctx context.Context) (uint64, []by } if err := p.updateNonce(ctx); err != nil { - if !p.queue.IsPersistent() && config.WaitForL1Finality { + if !p.queue.IsPersistent() && p.waitForL1Finality() { return 0, nil, false, fmt.Errorf("error getting latest finalized nonce (and queue is not persistent): %w", err) } // Fall back to using a recent block to get the nonce. This is safe because there's nothing in the queue. @@ -364,7 +373,10 @@ func (p *DataPoster) saveTx(ctx context.Context, prevTx, newTx *storage.QueuedTr if prevTx != nil && prevTx.Data.Nonce != newTx.Data.Nonce { return fmt.Errorf("prevTx nonce %v doesn't match newTx nonce %v", prevTx.Data.Nonce, newTx.Data.Nonce) } - return p.queue.Put(ctx, newTx.Data.Nonce, prevTx, newTx) + if err := p.queue.Put(ctx, newTx.Data.Nonce, prevTx, newTx); err != nil { + return fmt.Errorf("putting new tx in the queue: %w", err) + } + return nil } func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransaction, newTx *storage.QueuedTransaction) error { @@ -433,7 +445,7 @@ func (p *DataPoster) replaceTx(ctx context.Context, prevTx *storage.QueuedTransa // The mutex must be held by the caller. func (p *DataPoster) updateNonce(ctx context.Context) error { var blockNumQuery *big.Int - if p.config().WaitForL1Finality { + if p.waitForL1Finality() { blockNumQuery = big.NewInt(int64(rpc.FinalizedBlockNumber)) } header, err := p.client.HeaderByNumber(ctx, blockNumQuery) @@ -546,7 +558,7 @@ func (p *DataPoster) Start(ctxIn context.Context) { // replacing them by fee. queueContents, err := p.queue.FetchContents(ctx, unconfirmedNonce, maxTxsToRbf) if err != nil { - log.Error("Failed to get tx queue contents", "err", err) + log.Error("Failed to fetch tx queue contents", "err", err) return minWait } for index, tx := range queueContents { @@ -602,20 +614,23 @@ type QueueStorage interface { } type DataPosterConfig struct { - RedisSigner signature.SimpleHmacConfig `koanf:"redis-signer"` - ReplacementTimes string `koanf:"replacement-times"` - WaitForL1Finality bool `koanf:"wait-for-l1-finality" reload:"hot"` - MaxMempoolTransactions uint64 `koanf:"max-mempool-transactions" reload:"hot"` - MaxQueuedTransactions int `koanf:"max-queued-transactions" reload:"hot"` - TargetPriceGwei float64 `koanf:"target-price-gwei" reload:"hot"` - UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"` - MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"` - MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"` - MaxTipCapGwei float64 `koanf:"max-tip-cap-gwei" reload:"hot"` - NonceRbfSoftConfs uint64 `koanf:"nonce-rbf-soft-confs" reload:"hot"` - AllocateMempoolBalance bool `koanf:"allocate-mempool-balance" reload:"hot"` - UseLevelDB bool `koanf:"use-leveldb"` - UseNoOpStorage bool `koanf:"use-noop-storage"` + RedisSigner signature.SimpleHmacConfig `koanf:"redis-signer"` + ReplacementTimes string `koanf:"replacement-times"` + // This is forcibly disabled if the parent chain is an Arbitrum chain, + // so you should probably use DataPoster's waitForL1Finality method instead of reading this field directly. + WaitForL1Finality bool `koanf:"wait-for-l1-finality" reload:"hot"` + MaxMempoolTransactions uint64 `koanf:"max-mempool-transactions" reload:"hot"` + MaxQueuedTransactions int `koanf:"max-queued-transactions" reload:"hot"` + TargetPriceGwei float64 `koanf:"target-price-gwei" reload:"hot"` + UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"` + MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"` + MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"` + MaxTipCapGwei float64 `koanf:"max-tip-cap-gwei" reload:"hot"` + NonceRbfSoftConfs uint64 `koanf:"nonce-rbf-soft-confs" reload:"hot"` + AllocateMempoolBalance bool `koanf:"allocate-mempool-balance" reload:"hot"` + UseLevelDB bool `koanf:"use-leveldb"` + UseNoOpStorage bool `koanf:"use-noop-storage"` + LegacyStorageEncoding bool `koanf:"legacy-storage-encoding" reload:"hot"` } // ConfigFetcher function type is used instead of directly passing config so @@ -636,6 +651,7 @@ func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Bool(prefix+".allocate-mempool-balance", DefaultDataPosterConfig.AllocateMempoolBalance, "if true, don't put transactions in the mempool that spend a total greater than the batch poster's balance") f.Bool(prefix+".use-leveldb", DefaultDataPosterConfig.UseLevelDB, "uses leveldb when enabled") f.Bool(prefix+".use-noop-storage", DefaultDataPosterConfig.UseNoOpStorage, "uses noop storage, it doesn't store anything") + f.Bool(prefix+".legacy-storage-encoding", DefaultDataPosterConfig.LegacyStorageEncoding, "encodes items in a legacy way (as it was before dropping generics)") signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f) } @@ -651,6 +667,7 @@ var DefaultDataPosterConfig = DataPosterConfig{ AllocateMempoolBalance: true, UseLevelDB: false, UseNoOpStorage: false, + LegacyStorageEncoding: true, } var DefaultDataPosterConfigForValidator = func() DataPosterConfig { diff --git a/arbnode/dataposter/leveldb/leveldb.go b/arbnode/dataposter/leveldb/leveldb.go index e41a8665a6..cfb34b04f7 100644 --- a/arbnode/dataposter/leveldb/leveldb.go +++ b/arbnode/dataposter/leveldb/leveldb.go @@ -12,14 +12,14 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/memorydb" - "github.com/ethereum/go-ethereum/rlp" "github.com/offchainlabs/nitro/arbnode/dataposter/storage" "github.com/syndtr/goleveldb/leveldb" ) // Storage implements leveldb based storage for batch poster. type Storage struct { - db ethdb.Database + db ethdb.Database + encDec storage.EncoderDecoderF } var ( @@ -31,16 +31,8 @@ var ( countKey = []byte(".count_key") ) -func New(db ethdb.Database) *Storage { - return &Storage{db: db} -} - -func (s *Storage) decodeItem(data []byte) (*storage.QueuedTransaction, error) { - var item storage.QueuedTransaction - if err := rlp.DecodeBytes(data, &item); err != nil { - return nil, fmt.Errorf("decoding item: %w", err) - } - return &item, nil +func New(db ethdb.Database, enc storage.EncoderDecoderF) *Storage { + return &Storage{db: db, encDec: enc} } func idxToKey(idx uint64) []byte { @@ -55,7 +47,7 @@ func (s *Storage) FetchContents(_ context.Context, startingIndex uint64, maxResu if !it.Next() { break } - item, err := s.decodeItem(it.Value()) + item, err := s.encDec().Decode(it.Value()) if err != nil { return nil, err } @@ -84,7 +76,7 @@ func (s *Storage) FetchLast(ctx context.Context) (*storage.QueuedTransaction, er if err != nil { return nil, err } - return s.decodeItem(val) + return s.encDec().Decode(val) } func (s *Storage) Prune(ctx context.Context, until uint64) error { @@ -117,7 +109,7 @@ func (s *Storage) valueAt(_ context.Context, key []byte) ([]byte, error) { val, err := s.db.Get(key) if err != nil { if isErrNotFound(err) { - return rlp.EncodeToBytes((*storage.QueuedTransaction)(nil)) + return s.encDec().Encode((*storage.QueuedTransaction)(nil)) } return nil, err } @@ -130,14 +122,14 @@ func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.Queu if err != nil { return err } - prevEnc, err := rlp.EncodeToBytes(prev) + prevEnc, err := s.encDec().Encode(prev) if err != nil { return fmt.Errorf("encoding previous item: %w", err) } if !bytes.Equal(stored, prevEnc) { return fmt.Errorf("replacing different item than expected at index: %v, stored: %v, prevEnc: %v", index, stored, prevEnc) } - newEnc, err := rlp.EncodeToBytes(new) + newEnc, err := s.encDec().Encode(new) if err != nil { return fmt.Errorf("encoding new item: %w", err) } diff --git a/arbnode/dataposter/redis/redisstorage.go b/arbnode/dataposter/redis/redisstorage.go index e6fe666c69..f2393611b2 100644 --- a/arbnode/dataposter/redis/redisstorage.go +++ b/arbnode/dataposter/redis/redisstorage.go @@ -9,7 +9,6 @@ import ( "errors" "fmt" - "github.com/ethereum/go-ethereum/rlp" "github.com/go-redis/redis/v8" "github.com/offchainlabs/nitro/arbnode/dataposter/storage" "github.com/offchainlabs/nitro/util/signature" @@ -23,14 +22,15 @@ type Storage struct { client redis.UniversalClient signer *signature.SimpleHmac key string + encDec storage.EncoderDecoderF } -func NewStorage(client redis.UniversalClient, key string, signerConf *signature.SimpleHmacConfig) (*Storage, error) { +func NewStorage(client redis.UniversalClient, key string, signerConf *signature.SimpleHmacConfig, enc storage.EncoderDecoderF) (*Storage, error) { signer, err := signature.NewSimpleHmac(signerConf) if err != nil { return nil, err } - return &Storage{client, signer, key}, nil + return &Storage{client, signer, key, enc}, nil } func joinHmacMsg(msg []byte, sig []byte) ([]byte, error) { @@ -65,16 +65,15 @@ func (s *Storage) FetchContents(ctx context.Context, startingIndex uint64, maxRe } var items []*storage.QueuedTransaction for _, itemString := range itemStrings { - var item storage.QueuedTransaction data, err := s.peelVerifySignature([]byte(itemString)) if err != nil { return nil, err } - err = rlp.DecodeBytes(data, &item) + item, err := s.encDec().Decode(data) if err != nil { return nil, err } - items = append(items, &item) + items = append(items, item) } return items, nil } @@ -95,16 +94,15 @@ func (s *Storage) FetchLast(ctx context.Context) (*storage.QueuedTransaction, er } var ret *storage.QueuedTransaction if len(itemStrings) > 0 { - var item storage.QueuedTransaction data, err := s.peelVerifySignature([]byte(itemStrings[0])) if err != nil { return nil, err } - err = rlp.DecodeBytes(data, &item) + item, err := s.encDec().Decode(data) if err != nil { return nil, err } - ret = &item + ret = item } return ret, nil } @@ -116,6 +114,17 @@ func (s *Storage) Prune(ctx context.Context, until uint64) error { return nil } +// normalizeDecoding decodes data (regardless of what encoding it used), and +// encodes it according to current encoding for storage. +// As a result, encoded data is transformed to currently used encoding. +func (s *Storage) normalizeDecoding(data []byte) ([]byte, error) { + item, err := s.encDec().Decode(data) + if err != nil { + return nil, err + } + return s.encDec().Encode(item) +} + func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.QueuedTransaction) error { if new == nil { return fmt.Errorf("tried to insert nil item at index %v", index) @@ -144,21 +153,24 @@ func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.Queu if err != nil { return fmt.Errorf("failed to validate item already in redis at index%v: %w", index, err) } - prevItemEncoded, err := rlp.EncodeToBytes(prev) + verifiedItem, err = s.normalizeDecoding(verifiedItem) + if err != nil { + return fmt.Errorf("error normalizing encoding for verified item: %w", err) + } + prevItemEncoded, err := s.encDec().Encode(prev) if err != nil { return err } if !bytes.Equal(verifiedItem, prevItemEncoded) { return fmt.Errorf("%w: replacing different item than expected at index %v", storage.ErrStorageRace, index) } - err = pipe.ZRem(ctx, s.key, haveItems[0]).Err() - if err != nil { + if err := pipe.ZRem(ctx, s.key, haveItems[0]).Err(); err != nil { return err } } else { return fmt.Errorf("expected only one return value for Put but got %v", len(haveItems)) } - newItemEncoded, err := rlp.EncodeToBytes(*new) + newItemEncoded, err := s.encDec().Encode(new) if err != nil { return err } @@ -170,11 +182,10 @@ func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.Queu if err != nil { return err } - err = pipe.ZAdd(ctx, s.key, &redis.Z{ + if err := pipe.ZAdd(ctx, s.key, &redis.Z{ Score: float64(index), Member: string(signedItem), - }).Err() - if err != nil { + }).Err(); err != nil { return err } _, err = pipe.Exec(ctx) diff --git a/arbnode/dataposter/slice/slicestorage.go b/arbnode/dataposter/slice/slicestorage.go index 6eda5ca9a3..04286df411 100644 --- a/arbnode/dataposter/slice/slicestorage.go +++ b/arbnode/dataposter/slice/slicestorage.go @@ -9,25 +9,17 @@ import ( "errors" "fmt" - "github.com/ethereum/go-ethereum/rlp" "github.com/offchainlabs/nitro/arbnode/dataposter/storage" ) type Storage struct { firstNonce uint64 queue [][]byte + encDec func() storage.EncoderDecoderInterface } -func NewStorage() *Storage { - return &Storage{} -} - -func (s *Storage) decodeItem(data []byte) (*storage.QueuedTransaction, error) { - var item storage.QueuedTransaction - if err := rlp.DecodeBytes(data, &item); err != nil { - return nil, fmt.Errorf("decoding item: %w", err) - } - return &item, nil +func NewStorage(encDec func() storage.EncoderDecoderInterface) *Storage { + return &Storage{encDec: encDec} } func (s *Storage) FetchContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*storage.QueuedTransaction, error) { @@ -43,7 +35,7 @@ func (s *Storage) FetchContents(_ context.Context, startingIndex uint64, maxResu } var res []*storage.QueuedTransaction for _, r := range txs { - item, err := s.decodeItem(r) + item, err := s.encDec().Decode(r) if err != nil { return nil, err } @@ -56,7 +48,7 @@ func (s *Storage) FetchLast(context.Context) (*storage.QueuedTransaction, error) if len(s.queue) == 0 { return nil, nil } - return s.decodeItem(s.queue[len(s.queue)-1]) + return s.encDec().Decode(s.queue[len(s.queue)-1]) } func (s *Storage) Prune(_ context.Context, until uint64) error { @@ -73,7 +65,7 @@ func (s *Storage) Put(_ context.Context, index uint64, prev, new *storage.Queued if new == nil { return fmt.Errorf("tried to insert nil item at index %v", index) } - newEnc, err := rlp.EncodeToBytes(new) + newEnc, err := s.encDec().Encode(new) if err != nil { return fmt.Errorf("encoding new item: %w", err) } @@ -93,7 +85,7 @@ func (s *Storage) Put(_ context.Context, index uint64, prev, new *storage.Queued if queueIdx > len(s.queue) { return fmt.Errorf("attempted to set out-of-bounds index %v in queue starting at %v of length %v", index, s.firstNonce, len(s.queue)) } - prevEnc, err := rlp.EncodeToBytes(prev) + prevEnc, err := s.encDec().Encode(prev) if err != nil { return fmt.Errorf("encoding previous item: %w", err) } diff --git a/arbnode/dataposter/storage/storage.go b/arbnode/dataposter/storage/storage.go index 174ab131ac..b59bf7bf62 100644 --- a/arbnode/dataposter/storage/storage.go +++ b/arbnode/dataposter/storage/storage.go @@ -2,9 +2,13 @@ package storage import ( "errors" + "fmt" "time" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/offchainlabs/nitro/arbutil" ) var ( @@ -17,10 +21,130 @@ var ( ) type QueuedTransaction struct { - FullTx *types.Transaction `rlp:"nil"` + FullTx *types.Transaction Data types.DynamicFeeTx Meta []byte Sent bool Created time.Time // may be earlier than the tx was given to the tx poster NextReplacement time.Time } + +// LegacyQueuedTransaction is used for backwards compatibility. +// Before https://github.com/OffchainLabs/nitro/pull/1773: the queuedTransaction +// looked like this and was rlp encoded directly. After the pr, we are store +// rlp encoding of Meta into queuedTransaction and rlp encoding it once more +// to store it. +type LegacyQueuedTransaction struct { + FullTx *types.Transaction + Data types.DynamicFeeTx + Meta BatchPosterPosition + Sent bool + Created time.Time // may be earlier than the tx was given to the tx poster + NextReplacement time.Time +} + +// This is also for legacy reason. Since Batchposter is in arbnode package, +// we can't refer to BatchPosterPosition type there even if we export it (that +// would create cyclic dependency). +// We'll drop this struct in a few releases when we drop legacy encoding. +type BatchPosterPosition struct { + MessageCount arbutil.MessageIndex + DelayedMessageCount uint64 + NextSeqNum uint64 +} + +func DecodeLegacyQueuedTransaction(data []byte) (*LegacyQueuedTransaction, error) { + var val LegacyQueuedTransaction + if err := rlp.DecodeBytes(data, &val); err != nil { + return nil, fmt.Errorf("decoding legacy queued transaction: %w", err) + } + return &val, nil +} + +func LegacyToQueuedTransaction(legacyQT *LegacyQueuedTransaction) (*QueuedTransaction, error) { + meta, err := rlp.EncodeToBytes(legacyQT.Meta) + if err != nil { + return nil, fmt.Errorf("converting legacy to queued transaction: %w", err) + } + return &QueuedTransaction{ + FullTx: legacyQT.FullTx, + Data: legacyQT.Data, + Meta: meta, + Sent: legacyQT.Sent, + Created: legacyQT.Created, + NextReplacement: legacyQT.NextReplacement, + }, nil +} + +func QueuedTransactionToLegacy(qt *QueuedTransaction) (*LegacyQueuedTransaction, error) { + if qt == nil { + return nil, nil + } + var meta BatchPosterPosition + if qt.Meta != nil { + if err := rlp.DecodeBytes(qt.Meta, &meta); err != nil { + return nil, fmt.Errorf("converting queued transaction to legacy: %w", err) + } + } + return &LegacyQueuedTransaction{ + FullTx: qt.FullTx, + Data: qt.Data, + Meta: meta, + Sent: qt.Sent, + Created: qt.Created, + NextReplacement: qt.NextReplacement, + }, nil +} + +// Decode tries to decode QueuedTransaction, if that fails it tries to decode +// into legacy queued transaction and converts to queued +func decode(data []byte) (*QueuedTransaction, error) { + var item QueuedTransaction + if err := rlp.DecodeBytes(data, &item); err != nil { + log.Debug("Failed to decode QueuedTransaction, attempting to decide legacy queued transaction", "error", err) + val, err := DecodeLegacyQueuedTransaction(data) + if err != nil { + return nil, fmt.Errorf("decoding legacy item: %w", err) + } + log.Debug("Succeeded decoding QueuedTransaction with legacy encoder") + return LegacyToQueuedTransaction(val) + } + return &item, nil +} + +type EncoderDecoder struct{} + +func (e *EncoderDecoder) Encode(qt *QueuedTransaction) ([]byte, error) { + return rlp.EncodeToBytes(qt) +} + +func (e *EncoderDecoder) Decode(data []byte) (*QueuedTransaction, error) { + return decode(data) +} + +type LegacyEncoderDecoder struct{} + +func (e *LegacyEncoderDecoder) Encode(qt *QueuedTransaction) ([]byte, error) { + legacyQt, err := QueuedTransactionToLegacy(qt) + if err != nil { + return nil, fmt.Errorf("encoding legacy item: %w", err) + } + return rlp.EncodeToBytes(legacyQt) +} + +func (le *LegacyEncoderDecoder) Decode(data []byte) (*QueuedTransaction, error) { + return decode(data) +} + +// Typically interfaces belong to where they are being used, not at implementing +// site, but this is used in all storages (besides no-op) and all of them +// require all the functions for this interface. +type EncoderDecoderInterface interface { + Encode(*QueuedTransaction) ([]byte, error) + Decode([]byte) (*QueuedTransaction, error) +} + +// EncoderDecoderF is a function type that returns encoder/decoder interface. +// This is needed to implement hot-reloading flag to switch encoding/decoding +// strategy on the fly. +type EncoderDecoderF func() EncoderDecoderInterface diff --git a/arbnode/dataposter/storage_test.go b/arbnode/dataposter/storage_test.go index 2424ac0845..d536e5da05 100644 --- a/arbnode/dataposter/storage_test.go +++ b/arbnode/dataposter/storage_test.go @@ -6,8 +6,10 @@ import ( "path" "testing" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/offchainlabs/nitro/arbnode/dataposter/leveldb" @@ -21,42 +23,55 @@ import ( var ignoreData = cmp.Options{ cmpopts.IgnoreUnexported( + types.Transaction{}, types.DynamicFeeTx{}, big.Int{}, ), cmpopts.IgnoreFields(types.Transaction{}, "hash", "size", "from"), } -func newLevelDBStorage(t *testing.T) *leveldb.Storage { +func newLevelDBStorage(t *testing.T, encF storage.EncoderDecoderF) *leveldb.Storage { t.Helper() db, err := rawdb.NewLevelDBDatabase(path.Join(t.TempDir(), "level.db"), 0, 0, "default", false) if err != nil { t.Fatalf("NewLevelDBDatabase() unexpected error: %v", err) } - return leveldb.New(db) + return leveldb.New(db, encF) } -func newSliceStorage() *slice.Storage { - return slice.NewStorage() +func newSliceStorage(encF storage.EncoderDecoderF) *slice.Storage { + return slice.NewStorage(encF) } -func newRedisStorage(ctx context.Context, t *testing.T) *redis.Storage { +func newRedisStorage(ctx context.Context, t *testing.T, encF storage.EncoderDecoderF) *redis.Storage { t.Helper() redisUrl := redisutil.CreateTestRedis(ctx, t) client, err := redisutil.RedisClientFromURL(redisUrl) if err != nil { t.Fatalf("RedisClientFromURL(%q) unexpected error: %v", redisUrl, err) } - s, err := redis.NewStorage(client, "", &signature.TestSimpleHmacConfig) + s, err := redis.NewStorage(client, "", &signature.TestSimpleHmacConfig, encF) if err != nil { t.Fatalf("redis.NewStorage() unexpected error: %v", err) } return s } -func valueOf(i int) *storage.QueuedTransaction { +func valueOf(t *testing.T, i int) *storage.QueuedTransaction { + t.Helper() + meta, err := rlp.EncodeToBytes(storage.BatchPosterPosition{DelayedMessageCount: uint64(i)}) + if err != nil { + t.Fatalf("Encoding batch poster position, error: %v", err) + } return &storage.QueuedTransaction{ - Meta: []byte{byte(i)}, + FullTx: types.NewTransaction( + uint64(i), + common.Address{}, + big.NewInt(int64(i)), + uint64(i), + big.NewInt(int64(i)), + []byte{byte(i)}), + Meta: meta, Data: types.DynamicFeeTx{ ChainID: big.NewInt(int64(i)), Nonce: uint64(i), @@ -73,10 +88,10 @@ func valueOf(i int) *storage.QueuedTransaction { } } -func values(from, to int) []*storage.QueuedTransaction { +func values(t *testing.T, from, to int) []*storage.QueuedTransaction { var res []*storage.QueuedTransaction for i := from; i <= to; i++ { - res = append(res, valueOf(i)) + res = append(res, valueOf(t, i)) } return res } @@ -85,7 +100,7 @@ func values(from, to int) []*storage.QueuedTransaction { func initStorage(ctx context.Context, t *testing.T, s QueueStorage) QueueStorage { t.Helper() for i := 0; i < 20; i++ { - if err := s.Put(ctx, uint64(i), nil, valueOf(i)); err != nil { + if err := s.Put(ctx, uint64(i), nil, valueOf(t, i)); err != nil { t.Fatalf("Error putting a key/value: %v", err) } } @@ -95,10 +110,18 @@ func initStorage(ctx context.Context, t *testing.T, s QueueStorage) QueueStorage // Returns a map of all empty storages. func storages(t *testing.T) map[string]QueueStorage { t.Helper() + f := func(enc storage.EncoderDecoderInterface) storage.EncoderDecoderF { + return func() storage.EncoderDecoderInterface { + return enc + } + } return map[string]QueueStorage{ - "levelDB": newLevelDBStorage(t), - "slice": newSliceStorage(), - "redis": newRedisStorage(context.Background(), t), + "levelDBLegacy": newLevelDBStorage(t, f(&storage.LegacyEncoderDecoder{})), + "sliceLegacy": newSliceStorage(f(&storage.LegacyEncoderDecoder{})), + "redisLegacy": newRedisStorage(context.Background(), t, f(&storage.LegacyEncoderDecoder{})), + "levelDB": newLevelDBStorage(t, f(&storage.EncoderDecoder{})), + "slice": newSliceStorage(f(&storage.EncoderDecoder{})), + "redis": newRedisStorage(context.Background(), t, f(&storage.EncoderDecoder{})), } } @@ -125,13 +148,13 @@ func TestFetchContents(t *testing.T) { desc: "sequence with single digits", startIdx: 5, maxResults: 3, - want: values(5, 7), + want: values(t, 5, 7), }, { desc: "corner case of single element", startIdx: 0, maxResults: 1, - want: values(0, 0), + want: values(t, 0, 0), }, { desc: "no elements", @@ -143,13 +166,13 @@ func TestFetchContents(t *testing.T) { desc: "sequence with variable number of digits", startIdx: 9, maxResults: 3, - want: values(9, 11), + want: values(t, 9, 11), }, { desc: "max results goes over the last element", startIdx: 13, maxResults: 10, - want: values(13, 19), + want: values(t, 13, 19), }, } { t.Run(name+"_"+tc.desc, func(t *testing.T) { @@ -171,7 +194,7 @@ func TestLast(t *testing.T) { t.Run(name, func(t *testing.T) { ctx := context.Background() for i := 0; i < cnt; i++ { - val := valueOf(i) + val := valueOf(t, i) if err := s.Put(ctx, uint64(i), nil, val); err != nil { t.Fatalf("Error putting a key/value: %v", err) } @@ -185,12 +208,12 @@ func TestLast(t *testing.T) { } }) - last := valueOf(cnt - 1) + last := valueOf(t, cnt-1) t.Run(name+"_update_entries", func(t *testing.T) { ctx := context.Background() for i := 0; i < cnt-1; i++ { - prev := valueOf(i) - newVal := valueOf(cnt + i) + prev := valueOf(t, i) + newVal := valueOf(t, cnt+i) if err := s.Put(ctx, uint64(i), prev, newVal); err != nil { t.Fatalf("Error putting a key/value: %v, prev: %v, new: %v", err, prev, newVal) } @@ -227,17 +250,17 @@ func TestPrune(t *testing.T) { { desc: "prune all but one", pruneFrom: 19, - want: values(19, 19), + want: values(t, 19, 19), }, { desc: "pruning first element", pruneFrom: 1, - want: values(1, 19), + want: values(t, 1, 19), }, { desc: "pruning first 11 elements", pruneFrom: 11, - want: values(11, 19), + want: values(t, 11, 19), }, { desc: "pruning from higher than biggest index", diff --git a/arbos/tx_processor.go b/arbos/tx_processor.go index 09a4692eae..0d44ac548e 100644 --- a/arbos/tx_processor.go +++ b/arbos/tx_processor.go @@ -677,7 +677,7 @@ func (p *TxProcessor) GetPaidGasPrice() *big.Int { if version != 9 { gasPrice = p.evm.Context.BaseFee if p.msg.TxRunMode != core.MessageCommitMode && p.msg.GasFeeCap.Sign() == 0 { - gasPrice.SetInt64(0) // gasprice zero behavior + gasPrice = common.Big0 } } return gasPrice diff --git a/go-ethereum b/go-ethereum index c905292f8a..b4bd0da114 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit c905292f8af601f7fca261e65a7d4bc144261e29 +Subproject commit b4bd0da1142fe6bb81cac7e0794ebb4746b9885a diff --git a/linter/koanf/handlers.go b/linter/koanf/handlers.go index 00cd10c07e..5826004014 100644 --- a/linter/koanf/handlers.go +++ b/linter/koanf/handlers.go @@ -5,7 +5,6 @@ import ( "go/ast" "go/token" "strings" - "unicode" "github.com/fatih/structtag" "golang.org/x/tools/go/analysis" @@ -219,22 +218,7 @@ func checkStruct(pass *analysis.Pass, s *ast.StructType) Result { } func normalizeTag(s string) string { - ans := s[:1] - for i := 1; i < len(s); i++ { - c := rune(s[i]) - if !isAlphanumeric(c) { - continue - } - if !isAlphanumeric(rune(s[i-1])) && unicode.IsLower(c) { - c = unicode.ToUpper(c) - } - ans += string(c) - } - return ans -} - -func isAlphanumeric(c rune) bool { - return unicode.IsLetter(c) || unicode.IsDigit(c) + return strings.ReplaceAll(s, "-", "") } func normalizeID(pass *analysis.Pass, id string) string { diff --git a/nitro-testnode b/nitro-testnode index 14f24a1bad..7ad12c0f1b 160000 --- a/nitro-testnode +++ b/nitro-testnode @@ -1 +1 @@ -Subproject commit 14f24a1bad2625412602d06156156c380bd589d2 +Subproject commit 7ad12c0f1be75a72c7360d5258e0090f8225594e diff --git a/staker/block_validator.go b/staker/block_validator.go index f04b852041..94bc2a0806 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -597,7 +597,7 @@ func (v *BlockValidator) iterativeValidationPrint(ctx context.Context) time.Dura var batchMsgs arbutil.MessageIndex var printedCount int64 if validated.GlobalState.Batch > 0 { - batchMsgs, err = v.inboxTracker.GetBatchMessageCount(validated.GlobalState.Batch) + batchMsgs, err = v.inboxTracker.GetBatchMessageCount(validated.GlobalState.Batch - 1) } if err != nil { printedCount = -1 diff --git a/system_tests/conditionaltx_test.go b/system_tests/conditionaltx_test.go index c65103694a..14aa000313 100644 --- a/system_tests/conditionaltx_test.go +++ b/system_tests/conditionaltx_test.go @@ -16,7 +16,7 @@ import ( "github.com/ethereum/go-ethereum/arbitrum" "github.com/ethereum/go-ethereum/arbitrum_types" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rpc" @@ -103,23 +103,23 @@ func getOptions(address common.Address, rootHash common.Hash, slotValueMap map[c } func getFulfillableBlockTimeLimits(t *testing.T, blockNumber uint64, timestamp uint64) []*arbitrum_types.ConditionalOptions { - future := hexutil.Uint64(timestamp + 30) - past := hexutil.Uint64(timestamp - 1) - futureBlockNumber := hexutil.Uint64(blockNumber + 1000) - currentBlockNumber := hexutil.Uint64(blockNumber) + future := math.HexOrDecimal64(timestamp + 30) + past := math.HexOrDecimal64(timestamp - 1) + futureBlockNumber := math.HexOrDecimal64(blockNumber + 1000) + currentBlockNumber := math.HexOrDecimal64(blockNumber) return getBlockTimeLimits(t, currentBlockNumber, futureBlockNumber, past, future) } func getUnfulfillableBlockTimeLimits(t *testing.T, blockNumber uint64, timestamp uint64) []*arbitrum_types.ConditionalOptions { - future := hexutil.Uint64(timestamp + 30) - past := hexutil.Uint64(timestamp - 1) - futureBlockNumber := hexutil.Uint64(blockNumber + 1000) - previousBlockNumber := hexutil.Uint64(blockNumber - 1) + future := math.HexOrDecimal64(timestamp + 30) + past := math.HexOrDecimal64(timestamp - 1) + futureBlockNumber := math.HexOrDecimal64(blockNumber + 1000) + previousBlockNumber := math.HexOrDecimal64(blockNumber - 1) // skip first empty options return getBlockTimeLimits(t, futureBlockNumber, previousBlockNumber, future, past)[1:] } -func getBlockTimeLimits(t *testing.T, blockMin, blockMax hexutil.Uint64, timeMin, timeMax hexutil.Uint64) []*arbitrum_types.ConditionalOptions { +func getBlockTimeLimits(t *testing.T, blockMin, blockMax math.HexOrDecimal64, timeMin, timeMax math.HexOrDecimal64) []*arbitrum_types.ConditionalOptions { basic := []*arbitrum_types.ConditionalOptions{ {}, {TimestampMin: &timeMin}, @@ -157,9 +157,9 @@ func optionsProduct(optionsA, optionsB []*arbitrum_types.ConditionalOptions) []* c.KnownAccounts[k] = v } limitTriples := []struct { - a *hexutil.Uint64 - b *hexutil.Uint64 - c **hexutil.Uint64 + a *math.HexOrDecimal64 + b *math.HexOrDecimal64 + c **math.HexOrDecimal64 }{ {a.BlockNumberMin, b.BlockNumberMin, &c.BlockNumberMin}, {a.BlockNumberMax, b.BlockNumberMax, &c.BlockNumberMax}, @@ -168,10 +168,10 @@ func optionsProduct(optionsA, optionsB []*arbitrum_types.ConditionalOptions) []* } for _, tripple := range limitTriples { if tripple.b != nil { - value := hexutil.Uint64(*tripple.b) + value := math.HexOrDecimal64(*tripple.b) *tripple.c = &value } else if tripple.a != nil { - value := hexutil.Uint64(*tripple.a) + value := math.HexOrDecimal64(*tripple.a) *tripple.c = &value } else { *tripple.c = nil