Skip to content

Commit

Permalink
Merge branch 'koanf-cont-cont' of github.com:OffchainLabs/nitro into …
Browse files Browse the repository at this point in the history
…koanf-cont-cont
  • Loading branch information
anodar committed Sep 4, 2023
2 parents 800c0a4 + 1117ce5 commit 3be771f
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 81 deletions.
24 changes: 18 additions & 6 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,23 @@ func NewDataPoster(db ethdb.Database, headerReader *headerreader.HeaderReader, a
initConfig.UseNoOpStorage = true
log.Info("Disabling data poster storage, as parent chain appears to be an Arbitrum chain without a mempool")
}
encF := func() storage.EncoderDecoderInterface {
if config().LegacyStorageEncoding {
return &storage.LegacyEncoderDecoder{}
}
return &storage.EncoderDecoder{}
}
var queue QueueStorage
switch {
case initConfig.UseNoOpStorage:
queue = &noop.Storage{}
case initConfig.UseLevelDB:
queue = leveldb.New(db)
queue = leveldb.New(db, func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} })
case redisClient == nil:
queue = slice.NewStorage()
queue = slice.NewStorage(func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} })
default:
var err error
queue, err = redisstorage.NewStorage(redisClient, "data-poster.queue", &initConfig.RedisSigner)
queue, err = redisstorage.NewStorage(redisClient, "data-poster.queue", &initConfig.RedisSigner, encF)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -177,7 +183,7 @@ func (p *DataPoster) getNextNonceAndMaybeMeta(ctx context.Context) (uint64, []by
}
lastQueueItem, err := p.queue.FetchLast(ctx)
if err != nil {
return 0, nil, false, err
return 0, nil, false, fmt.Errorf("fetching last element from queue: %w", err)
}
if lastQueueItem != nil {
nextNonce := lastQueueItem.Data.Nonce + 1
Expand Down Expand Up @@ -367,7 +373,10 @@ func (p *DataPoster) saveTx(ctx context.Context, prevTx, newTx *storage.QueuedTr
if prevTx != nil && prevTx.Data.Nonce != newTx.Data.Nonce {
return fmt.Errorf("prevTx nonce %v doesn't match newTx nonce %v", prevTx.Data.Nonce, newTx.Data.Nonce)
}
return p.queue.Put(ctx, newTx.Data.Nonce, prevTx, newTx)
if err := p.queue.Put(ctx, newTx.Data.Nonce, prevTx, newTx); err != nil {
return fmt.Errorf("putting new tx in the queue: %w", err)
}
return nil
}

func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransaction, newTx *storage.QueuedTransaction) error {
Expand Down Expand Up @@ -549,7 +558,7 @@ func (p *DataPoster) Start(ctxIn context.Context) {
// replacing them by fee.
queueContents, err := p.queue.FetchContents(ctx, unconfirmedNonce, maxTxsToRbf)
if err != nil {
log.Error("Failed to get tx queue contents", "err", err)
log.Error("Failed to fetch tx queue contents", "err", err)
return minWait
}
for index, tx := range queueContents {
Expand Down Expand Up @@ -621,6 +630,7 @@ type DataPosterConfig struct {
AllocateMempoolBalance bool `koanf:"allocate-mempool-balance" reload:"hot"`
UseLevelDB bool `koanf:"use-leveldb"`
UseNoOpStorage bool `koanf:"use-noop-storage"`
LegacyStorageEncoding bool `koanf:"legacy-storage-encoding" reload:"hot"`
}

// ConfigFetcher function type is used instead of directly passing config so
Expand All @@ -641,6 +651,7 @@ func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".allocate-mempool-balance", DefaultDataPosterConfig.AllocateMempoolBalance, "if true, don't put transactions in the mempool that spend a total greater than the batch poster's balance")
f.Bool(prefix+".use-leveldb", DefaultDataPosterConfig.UseLevelDB, "uses leveldb when enabled")
f.Bool(prefix+".use-noop-storage", DefaultDataPosterConfig.UseNoOpStorage, "uses noop storage, it doesn't store anything")
f.Bool(prefix+".legacy-storage-encoding", DefaultDataPosterConfig.LegacyStorageEncoding, "encodes items in a legacy way (as it was before dropping generics)")
signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f)
}

Expand All @@ -656,6 +667,7 @@ var DefaultDataPosterConfig = DataPosterConfig{
AllocateMempoolBalance: true,
UseLevelDB: false,
UseNoOpStorage: false,
LegacyStorageEncoding: true,
}

var DefaultDataPosterConfigForValidator = func() DataPosterConfig {
Expand Down
26 changes: 9 additions & 17 deletions arbnode/dataposter/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/syndtr/goleveldb/leveldb"
)

// Storage implements leveldb based storage for batch poster.
type Storage struct {
db ethdb.Database
db ethdb.Database
encDec storage.EncoderDecoderF
}

var (
Expand All @@ -31,16 +31,8 @@ var (
countKey = []byte(".count_key")
)

func New(db ethdb.Database) *Storage {
return &Storage{db: db}
}

func (s *Storage) decodeItem(data []byte) (*storage.QueuedTransaction, error) {
var item storage.QueuedTransaction
if err := rlp.DecodeBytes(data, &item); err != nil {
return nil, fmt.Errorf("decoding item: %w", err)
}
return &item, nil
func New(db ethdb.Database, enc storage.EncoderDecoderF) *Storage {
return &Storage{db: db, encDec: enc}
}

func idxToKey(idx uint64) []byte {
Expand All @@ -55,7 +47,7 @@ func (s *Storage) FetchContents(_ context.Context, startingIndex uint64, maxResu
if !it.Next() {
break
}
item, err := s.decodeItem(it.Value())
item, err := s.encDec().Decode(it.Value())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -84,7 +76,7 @@ func (s *Storage) FetchLast(ctx context.Context) (*storage.QueuedTransaction, er
if err != nil {
return nil, err
}
return s.decodeItem(val)
return s.encDec().Decode(val)
}

func (s *Storage) Prune(ctx context.Context, until uint64) error {
Expand Down Expand Up @@ -117,7 +109,7 @@ func (s *Storage) valueAt(_ context.Context, key []byte) ([]byte, error) {
val, err := s.db.Get(key)
if err != nil {
if isErrNotFound(err) {
return rlp.EncodeToBytes((*storage.QueuedTransaction)(nil))
return s.encDec().Encode((*storage.QueuedTransaction)(nil))
}
return nil, err
}
Expand All @@ -130,14 +122,14 @@ func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.Queu
if err != nil {
return err
}
prevEnc, err := rlp.EncodeToBytes(prev)
prevEnc, err := s.encDec().Encode(prev)
if err != nil {
return fmt.Errorf("encoding previous item: %w", err)
}
if !bytes.Equal(stored, prevEnc) {
return fmt.Errorf("replacing different item than expected at index: %v, stored: %v, prevEnc: %v", index, stored, prevEnc)
}
newEnc, err := rlp.EncodeToBytes(new)
newEnc, err := s.encDec().Encode(new)
if err != nil {
return fmt.Errorf("encoding new item: %w", err)
}
Expand Down
43 changes: 27 additions & 16 deletions arbnode/dataposter/redis/redisstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"errors"
"fmt"

"github.com/ethereum/go-ethereum/rlp"
"github.com/go-redis/redis/v8"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/util/signature"
Expand All @@ -23,14 +22,15 @@ type Storage struct {
client redis.UniversalClient
signer *signature.SimpleHmac
key string
encDec storage.EncoderDecoderF
}

func NewStorage(client redis.UniversalClient, key string, signerConf *signature.SimpleHmacConfig) (*Storage, error) {
func NewStorage(client redis.UniversalClient, key string, signerConf *signature.SimpleHmacConfig, enc storage.EncoderDecoderF) (*Storage, error) {
signer, err := signature.NewSimpleHmac(signerConf)
if err != nil {
return nil, err
}
return &Storage{client, signer, key}, nil
return &Storage{client, signer, key, enc}, nil
}

func joinHmacMsg(msg []byte, sig []byte) ([]byte, error) {
Expand Down Expand Up @@ -65,16 +65,15 @@ func (s *Storage) FetchContents(ctx context.Context, startingIndex uint64, maxRe
}
var items []*storage.QueuedTransaction
for _, itemString := range itemStrings {
var item storage.QueuedTransaction
data, err := s.peelVerifySignature([]byte(itemString))
if err != nil {
return nil, err
}
err = rlp.DecodeBytes(data, &item)
item, err := s.encDec().Decode(data)
if err != nil {
return nil, err
}
items = append(items, &item)
items = append(items, item)
}
return items, nil
}
Expand All @@ -95,16 +94,15 @@ func (s *Storage) FetchLast(ctx context.Context) (*storage.QueuedTransaction, er
}
var ret *storage.QueuedTransaction
if len(itemStrings) > 0 {
var item storage.QueuedTransaction
data, err := s.peelVerifySignature([]byte(itemStrings[0]))
if err != nil {
return nil, err
}
err = rlp.DecodeBytes(data, &item)
item, err := s.encDec().Decode(data)
if err != nil {
return nil, err
}
ret = &item
ret = item
}
return ret, nil
}
Expand All @@ -116,6 +114,17 @@ func (s *Storage) Prune(ctx context.Context, until uint64) error {
return nil
}

// normalizeDecoding decodes data (regardless of what encoding it used), and
// encodes it according to current encoding for storage.
// As a result, encoded data is transformed to currently used encoding.
func (s *Storage) normalizeDecoding(data []byte) ([]byte, error) {
item, err := s.encDec().Decode(data)
if err != nil {
return nil, err
}
return s.encDec().Encode(item)
}

func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.QueuedTransaction) error {
if new == nil {
return fmt.Errorf("tried to insert nil item at index %v", index)
Expand Down Expand Up @@ -144,21 +153,24 @@ func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.Queu
if err != nil {
return fmt.Errorf("failed to validate item already in redis at index%v: %w", index, err)
}
prevItemEncoded, err := rlp.EncodeToBytes(prev)
verifiedItem, err = s.normalizeDecoding(verifiedItem)
if err != nil {
return fmt.Errorf("error normalizing encoding for verified item: %w", err)
}
prevItemEncoded, err := s.encDec().Encode(prev)
if err != nil {
return err
}
if !bytes.Equal(verifiedItem, prevItemEncoded) {
return fmt.Errorf("%w: replacing different item than expected at index %v", storage.ErrStorageRace, index)
}
err = pipe.ZRem(ctx, s.key, haveItems[0]).Err()
if err != nil {
if err := pipe.ZRem(ctx, s.key, haveItems[0]).Err(); err != nil {
return err
}
} else {
return fmt.Errorf("expected only one return value for Put but got %v", len(haveItems))
}
newItemEncoded, err := rlp.EncodeToBytes(*new)
newItemEncoded, err := s.encDec().Encode(new)
if err != nil {
return err
}
Expand All @@ -170,11 +182,10 @@ func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.Queu
if err != nil {
return err
}
err = pipe.ZAdd(ctx, s.key, &redis.Z{
if err := pipe.ZAdd(ctx, s.key, &redis.Z{
Score: float64(index),
Member: string(signedItem),
}).Err()
if err != nil {
}).Err(); err != nil {
return err
}
_, err = pipe.Exec(ctx)
Expand Down
22 changes: 7 additions & 15 deletions arbnode/dataposter/slice/slicestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,17 @@ import (
"errors"
"fmt"

"github.com/ethereum/go-ethereum/rlp"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
)

type Storage struct {
firstNonce uint64
queue [][]byte
encDec func() storage.EncoderDecoderInterface
}

func NewStorage() *Storage {
return &Storage{}
}

func (s *Storage) decodeItem(data []byte) (*storage.QueuedTransaction, error) {
var item storage.QueuedTransaction
if err := rlp.DecodeBytes(data, &item); err != nil {
return nil, fmt.Errorf("decoding item: %w", err)
}
return &item, nil
func NewStorage(encDec func() storage.EncoderDecoderInterface) *Storage {
return &Storage{encDec: encDec}
}

func (s *Storage) FetchContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*storage.QueuedTransaction, error) {
Expand All @@ -43,7 +35,7 @@ func (s *Storage) FetchContents(_ context.Context, startingIndex uint64, maxResu
}
var res []*storage.QueuedTransaction
for _, r := range txs {
item, err := s.decodeItem(r)
item, err := s.encDec().Decode(r)
if err != nil {
return nil, err
}
Expand All @@ -56,7 +48,7 @@ func (s *Storage) FetchLast(context.Context) (*storage.QueuedTransaction, error)
if len(s.queue) == 0 {
return nil, nil
}
return s.decodeItem(s.queue[len(s.queue)-1])
return s.encDec().Decode(s.queue[len(s.queue)-1])
}

func (s *Storage) Prune(_ context.Context, until uint64) error {
Expand All @@ -73,7 +65,7 @@ func (s *Storage) Put(_ context.Context, index uint64, prev, new *storage.Queued
if new == nil {
return fmt.Errorf("tried to insert nil item at index %v", index)
}
newEnc, err := rlp.EncodeToBytes(new)
newEnc, err := s.encDec().Encode(new)
if err != nil {
return fmt.Errorf("encoding new item: %w", err)
}
Expand All @@ -93,7 +85,7 @@ func (s *Storage) Put(_ context.Context, index uint64, prev, new *storage.Queued
if queueIdx > len(s.queue) {
return fmt.Errorf("attempted to set out-of-bounds index %v in queue starting at %v of length %v", index, s.firstNonce, len(s.queue))
}
prevEnc, err := rlp.EncodeToBytes(prev)
prevEnc, err := s.encDec().Encode(prev)
if err != nil {
return fmt.Errorf("encoding previous item: %w", err)
}
Expand Down
Loading

0 comments on commit 3be771f

Please sign in to comment.