Skip to content

Commit

Permalink
Merge branch 'master' into seq-coordinator-manager
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli authored Sep 5, 2023
2 parents c2a6a54 + 9a65598 commit 482c1d6
Show file tree
Hide file tree
Showing 15 changed files with 546 additions and 120 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ contracts/test/prover/proofs/%.json: $(arbitrator_cases)/%.wasm $(arbitrator_pro
# strategic rules to minimize dependency building

.make/lint: $(DEP_PREDICATE) build-node-deps $(ORDER_ONLY_PREDICATE) .make
go run linter/koanf/koanf.go ./...
go run linter/pointercheck/pointer.go ./...
golangci-lint run --fix
yarn --cwd contracts solhint
Expand Down
46 changes: 24 additions & 22 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ type batchPosterPosition struct {

type BatchPoster struct {
stopwaiter.StopWaiter
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstAccErr time.Time // first time a continuous missing accumulator occurred
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred
// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
backlog uint64
Expand Down Expand Up @@ -309,8 +309,8 @@ func (b *BatchPoster) pollForReverts(ctx context.Context) {
// - polling is through context, or
// - we see a transaction in the block from dataposter that was reverted.
select {
case h, closed := <-headerCh:
if closed {
case h, ok := <-headerCh:
if !ok {
log.Info("L1 headers channel has been closed")
return
}
Expand Down Expand Up @@ -970,20 +970,22 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
return b.config().PollInterval
}
posted, err := b.maybePostSequencerBatch(ctx)
ephemeralError := errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace)
if !ephemeralError {
b.firstEphemeralError = time.Time{}
}
if err != nil {
b.building = nil
logLevel := log.Error
if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace) {
if ephemeralError {
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
if b.firstAccErr == (time.Time{}) {
b.firstAccErr = time.Now()
if b.firstEphemeralError == (time.Time{}) {
b.firstEphemeralError = time.Now()
logLevel = log.Debug
} else if time.Since(b.firstAccErr) < time.Minute {
} else if time.Since(b.firstEphemeralError) < time.Minute {
logLevel = log.Debug
}
} else {
b.firstAccErr = time.Time{}
}
logLevel("error posting batch", "err", err)
return b.config().ErrorDelay
Expand Down
24 changes: 18 additions & 6 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,23 @@ func NewDataPoster(db ethdb.Database, headerReader *headerreader.HeaderReader, a
initConfig.UseNoOpStorage = true
log.Info("Disabling data poster storage, as parent chain appears to be an Arbitrum chain without a mempool")
}
encF := func() storage.EncoderDecoderInterface {
if config().LegacyStorageEncoding {
return &storage.LegacyEncoderDecoder{}
}
return &storage.EncoderDecoder{}
}
var queue QueueStorage
switch {
case initConfig.UseNoOpStorage:
queue = &noop.Storage{}
case initConfig.UseLevelDB:
queue = leveldb.New(db)
queue = leveldb.New(db, func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} })
case redisClient == nil:
queue = slice.NewStorage()
queue = slice.NewStorage(func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} })
default:
var err error
queue, err = redisstorage.NewStorage(redisClient, "data-poster.queue", &initConfig.RedisSigner)
queue, err = redisstorage.NewStorage(redisClient, "data-poster.queue", &initConfig.RedisSigner, encF)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -177,7 +183,7 @@ func (p *DataPoster) getNextNonceAndMaybeMeta(ctx context.Context) (uint64, []by
}
lastQueueItem, err := p.queue.FetchLast(ctx)
if err != nil {
return 0, nil, false, err
return 0, nil, false, fmt.Errorf("fetching last element from queue: %w", err)
}
if lastQueueItem != nil {
nextNonce := lastQueueItem.Data.Nonce + 1
Expand Down Expand Up @@ -367,7 +373,10 @@ func (p *DataPoster) saveTx(ctx context.Context, prevTx, newTx *storage.QueuedTr
if prevTx != nil && prevTx.Data.Nonce != newTx.Data.Nonce {
return fmt.Errorf("prevTx nonce %v doesn't match newTx nonce %v", prevTx.Data.Nonce, newTx.Data.Nonce)
}
return p.queue.Put(ctx, newTx.Data.Nonce, prevTx, newTx)
if err := p.queue.Put(ctx, newTx.Data.Nonce, prevTx, newTx); err != nil {
return fmt.Errorf("putting new tx in the queue: %w", err)
}
return nil
}

func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransaction, newTx *storage.QueuedTransaction) error {
Expand Down Expand Up @@ -549,7 +558,7 @@ func (p *DataPoster) Start(ctxIn context.Context) {
// replacing them by fee.
queueContents, err := p.queue.FetchContents(ctx, unconfirmedNonce, maxTxsToRbf)
if err != nil {
log.Error("Failed to get tx queue contents", "err", err)
log.Error("Failed to fetch tx queue contents", "err", err)
return minWait
}
for index, tx := range queueContents {
Expand Down Expand Up @@ -621,6 +630,7 @@ type DataPosterConfig struct {
AllocateMempoolBalance bool `koanf:"allocate-mempool-balance" reload:"hot"`
UseLevelDB bool `koanf:"use-leveldb"`
UseNoOpStorage bool `koanf:"use-noop-storage"`
LegacyStorageEncoding bool `koanf:"legacy-storage-encoding" reload:"hot"`
}

// ConfigFetcher function type is used instead of directly passing config so
Expand All @@ -641,6 +651,7 @@ func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".allocate-mempool-balance", DefaultDataPosterConfig.AllocateMempoolBalance, "if true, don't put transactions in the mempool that spend a total greater than the batch poster's balance")
f.Bool(prefix+".use-leveldb", DefaultDataPosterConfig.UseLevelDB, "uses leveldb when enabled")
f.Bool(prefix+".use-noop-storage", DefaultDataPosterConfig.UseNoOpStorage, "uses noop storage, it doesn't store anything")
f.Bool(prefix+".legacy-storage-encoding", DefaultDataPosterConfig.LegacyStorageEncoding, "encodes items in a legacy way (as it was before dropping generics)")
signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f)
}

Expand All @@ -656,6 +667,7 @@ var DefaultDataPosterConfig = DataPosterConfig{
AllocateMempoolBalance: true,
UseLevelDB: false,
UseNoOpStorage: false,
LegacyStorageEncoding: true,
}

var DefaultDataPosterConfigForValidator = func() DataPosterConfig {
Expand Down
26 changes: 9 additions & 17 deletions arbnode/dataposter/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/syndtr/goleveldb/leveldb"
)

// Storage implements leveldb based storage for batch poster.
type Storage struct {
db ethdb.Database
db ethdb.Database
encDec storage.EncoderDecoderF
}

var (
Expand All @@ -31,16 +31,8 @@ var (
countKey = []byte(".count_key")
)

func New(db ethdb.Database) *Storage {
return &Storage{db: db}
}

func (s *Storage) decodeItem(data []byte) (*storage.QueuedTransaction, error) {
var item storage.QueuedTransaction
if err := rlp.DecodeBytes(data, &item); err != nil {
return nil, fmt.Errorf("decoding item: %w", err)
}
return &item, nil
func New(db ethdb.Database, enc storage.EncoderDecoderF) *Storage {
return &Storage{db: db, encDec: enc}
}

func idxToKey(idx uint64) []byte {
Expand All @@ -55,7 +47,7 @@ func (s *Storage) FetchContents(_ context.Context, startingIndex uint64, maxResu
if !it.Next() {
break
}
item, err := s.decodeItem(it.Value())
item, err := s.encDec().Decode(it.Value())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -84,7 +76,7 @@ func (s *Storage) FetchLast(ctx context.Context) (*storage.QueuedTransaction, er
if err != nil {
return nil, err
}
return s.decodeItem(val)
return s.encDec().Decode(val)
}

func (s *Storage) Prune(ctx context.Context, until uint64) error {
Expand Down Expand Up @@ -117,7 +109,7 @@ func (s *Storage) valueAt(_ context.Context, key []byte) ([]byte, error) {
val, err := s.db.Get(key)
if err != nil {
if isErrNotFound(err) {
return rlp.EncodeToBytes((*storage.QueuedTransaction)(nil))
return s.encDec().Encode((*storage.QueuedTransaction)(nil))
}
return nil, err
}
Expand All @@ -130,14 +122,14 @@ func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.Queu
if err != nil {
return err
}
prevEnc, err := rlp.EncodeToBytes(prev)
prevEnc, err := s.encDec().Encode(prev)
if err != nil {
return fmt.Errorf("encoding previous item: %w", err)
}
if !bytes.Equal(stored, prevEnc) {
return fmt.Errorf("replacing different item than expected at index: %v, stored: %v, prevEnc: %v", index, stored, prevEnc)
}
newEnc, err := rlp.EncodeToBytes(new)
newEnc, err := s.encDec().Encode(new)
if err != nil {
return fmt.Errorf("encoding new item: %w", err)
}
Expand Down
43 changes: 27 additions & 16 deletions arbnode/dataposter/redis/redisstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"errors"
"fmt"

"github.com/ethereum/go-ethereum/rlp"
"github.com/go-redis/redis/v8"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/util/signature"
Expand All @@ -23,14 +22,15 @@ type Storage struct {
client redis.UniversalClient
signer *signature.SimpleHmac
key string
encDec storage.EncoderDecoderF
}

func NewStorage(client redis.UniversalClient, key string, signerConf *signature.SimpleHmacConfig) (*Storage, error) {
func NewStorage(client redis.UniversalClient, key string, signerConf *signature.SimpleHmacConfig, enc storage.EncoderDecoderF) (*Storage, error) {
signer, err := signature.NewSimpleHmac(signerConf)
if err != nil {
return nil, err
}
return &Storage{client, signer, key}, nil
return &Storage{client, signer, key, enc}, nil
}

func joinHmacMsg(msg []byte, sig []byte) ([]byte, error) {
Expand Down Expand Up @@ -65,16 +65,15 @@ func (s *Storage) FetchContents(ctx context.Context, startingIndex uint64, maxRe
}
var items []*storage.QueuedTransaction
for _, itemString := range itemStrings {
var item storage.QueuedTransaction
data, err := s.peelVerifySignature([]byte(itemString))
if err != nil {
return nil, err
}
err = rlp.DecodeBytes(data, &item)
item, err := s.encDec().Decode(data)
if err != nil {
return nil, err
}
items = append(items, &item)
items = append(items, item)
}
return items, nil
}
Expand All @@ -95,16 +94,15 @@ func (s *Storage) FetchLast(ctx context.Context) (*storage.QueuedTransaction, er
}
var ret *storage.QueuedTransaction
if len(itemStrings) > 0 {
var item storage.QueuedTransaction
data, err := s.peelVerifySignature([]byte(itemStrings[0]))
if err != nil {
return nil, err
}
err = rlp.DecodeBytes(data, &item)
item, err := s.encDec().Decode(data)
if err != nil {
return nil, err
}
ret = &item
ret = item
}
return ret, nil
}
Expand All @@ -116,6 +114,17 @@ func (s *Storage) Prune(ctx context.Context, until uint64) error {
return nil
}

// normalizeDecoding decodes data (regardless of what encoding it used), and
// encodes it according to current encoding for storage.
// As a result, encoded data is transformed to currently used encoding.
func (s *Storage) normalizeDecoding(data []byte) ([]byte, error) {
item, err := s.encDec().Decode(data)
if err != nil {
return nil, err
}
return s.encDec().Encode(item)
}

func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.QueuedTransaction) error {
if new == nil {
return fmt.Errorf("tried to insert nil item at index %v", index)
Expand Down Expand Up @@ -144,21 +153,24 @@ func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.Queu
if err != nil {
return fmt.Errorf("failed to validate item already in redis at index%v: %w", index, err)
}
prevItemEncoded, err := rlp.EncodeToBytes(prev)
verifiedItem, err = s.normalizeDecoding(verifiedItem)
if err != nil {
return fmt.Errorf("error normalizing encoding for verified item: %w", err)
}
prevItemEncoded, err := s.encDec().Encode(prev)
if err != nil {
return err
}
if !bytes.Equal(verifiedItem, prevItemEncoded) {
return fmt.Errorf("%w: replacing different item than expected at index %v", storage.ErrStorageRace, index)
}
err = pipe.ZRem(ctx, s.key, haveItems[0]).Err()
if err != nil {
if err := pipe.ZRem(ctx, s.key, haveItems[0]).Err(); err != nil {
return err
}
} else {
return fmt.Errorf("expected only one return value for Put but got %v", len(haveItems))
}
newItemEncoded, err := rlp.EncodeToBytes(*new)
newItemEncoded, err := s.encDec().Encode(new)
if err != nil {
return err
}
Expand All @@ -170,11 +182,10 @@ func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.Queu
if err != nil {
return err
}
err = pipe.ZAdd(ctx, s.key, &redis.Z{
if err := pipe.ZAdd(ctx, s.key, &redis.Z{
Score: float64(index),
Member: string(signedItem),
}).Err()
if err != nil {
}).Err(); err != nil {
return err
}
_, err = pipe.Exec(ctx)
Expand Down
Loading

0 comments on commit 482c1d6

Please sign in to comment.