Skip to content

Commit

Permalink
Merge branch 'koanf-cont-cont' into enable-custom-linter-ci
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar authored Sep 4, 2023
2 parents 293ed4c + 1117ce5 commit cd13aa1
Show file tree
Hide file tree
Showing 16 changed files with 312 additions and 159 deletions.
4 changes: 2 additions & 2 deletions arbitrator/jit/src/syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,10 @@ pub fn js_value_index(mut env: WasmEnvMut, sp: u32) {

pub fn js_value_call(mut env: WasmEnvMut, sp: u32) -> MaybeEscape {
let Some(resume) = env.data().exports.resume.clone() else {
return Escape::failure(format!("wasmer failed to bind {}", "resume".red()))
return Escape::failure(format!("wasmer failed to bind {}", "resume".red()));
};
let Some(get_stack_pointer) = env.data().exports.get_stack_pointer.clone() else {
return Escape::failure(format!("wasmer failed to bind {}", "getsp".red()))
return Escape::failure(format!("wasmer failed to bind {}", "getsp".red()));
};
let sp = GoStack::simple(sp, &env);
let data = env.data_mut();
Expand Down
2 changes: 1 addition & 1 deletion arbitrator/prover/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ pub struct MachineState<'a> {
initial_hash: Bytes32,
}

pub type PreimageResolver = Arc<dyn Fn(u64, Bytes32) -> Option<CBytes>>;
pub type PreimageResolver = Arc<dyn Fn(u64, Bytes32) -> Option<CBytes> + Send + Sync>;

/// Wraps a preimage resolver to provide an easier API
/// and cache the last preimage retrieved.
Expand Down
7 changes: 7 additions & 0 deletions arbitrator/prover/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ impl From<&[u8]> for CBytes {
}
}

// There's no thread safety concerns for CBytes.
// This type is basically a Box<[u8]> (which is Send + Sync) with libc as an allocator.
// Any data races between threads are prevented by Rust borrowing rules,
// and the data isn't thread-local so there's no concern moving it between threads.
unsafe impl Send for CBytes {}
unsafe impl Sync for CBytes {}

#[derive(Serialize, Deserialize)]
#[serde(remote = "Type")]
enum RemoteType {
Expand Down
49 changes: 26 additions & 23 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ type batchPosterPosition struct {

type BatchPoster struct {
stopwaiter.StopWaiter
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstAccErr time.Time // first time a continuous missing accumulator occurred
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
bridge *bridgegen.Bridge
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred
// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
backlog uint64
Expand Down Expand Up @@ -309,8 +309,8 @@ func (b *BatchPoster) pollForReverts(ctx context.Context) {
// - polling is through context, or
// - we see a transaction in the block from dataposter that was reverted.
select {
case h, closed := <-headerCh:
if closed {
case h, ok := <-headerCh:
if !ok {
log.Info("L1 headers channel has been closed")
return
}
Expand Down Expand Up @@ -787,7 +787,8 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
if err != nil {
return false, err
}
blockNumberWithPadding := arbmath.SaturatingUAdd(arbmath.BigToUintSaturating(latestHeader.Number), uint64(config.L1BlockBoundBypass/ethPosBlockTime))
latestBlockNumber := arbutil.ParentHeaderToL1BlockNumber(latestHeader)
blockNumberWithPadding := arbmath.SaturatingUAdd(latestBlockNumber, uint64(config.L1BlockBoundBypass/ethPosBlockTime))
timestampWithPadding := arbmath.SaturatingUAdd(latestHeader.Time, uint64(config.L1BlockBoundBypass/time.Second))

l1BoundMinBlockNumber = arbmath.SaturatingUSub(blockNumberWithPadding, arbmath.BigToUintSaturating(maxTimeVariation.DelayBlocks))
Expand Down Expand Up @@ -969,20 +970,22 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
return b.config().PollInterval
}
posted, err := b.maybePostSequencerBatch(ctx)
ephemeralError := errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace)
if !ephemeralError {
b.firstEphemeralError = time.Time{}
}
if err != nil {
b.building = nil
logLevel := log.Error
if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace) {
if ephemeralError {
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
if b.firstAccErr == (time.Time{}) {
b.firstAccErr = time.Now()
if b.firstEphemeralError == (time.Time{}) {
b.firstEphemeralError = time.Now()
logLevel = log.Debug
} else if time.Since(b.firstAccErr) < time.Minute {
} else if time.Since(b.firstEphemeralError) < time.Minute {
logLevel = log.Debug
}
} else {
b.firstAccErr = time.Time{}
}
logLevel("error posting batch", "err", err)
return b.config().ErrorDelay
Expand Down
63 changes: 40 additions & 23 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,23 @@ func NewDataPoster(db ethdb.Database, headerReader *headerreader.HeaderReader, a
initConfig.UseNoOpStorage = true
log.Info("Disabling data poster storage, as parent chain appears to be an Arbitrum chain without a mempool")
}
encF := func() storage.EncoderDecoderInterface {
if config().LegacyStorageEncoding {
return &storage.LegacyEncoderDecoder{}
}
return &storage.EncoderDecoder{}
}
var queue QueueStorage
switch {
case initConfig.UseNoOpStorage:
queue = &noop.Storage{}
case initConfig.UseLevelDB:
queue = leveldb.New(db)
queue = leveldb.New(db, func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} })
case redisClient == nil:
queue = slice.NewStorage()
queue = slice.NewStorage(func() storage.EncoderDecoderInterface { return &storage.EncoderDecoder{} })
default:
var err error
queue, err = redisstorage.NewStorage(redisClient, "data-poster.queue", &initConfig.RedisSigner)
queue, err = redisstorage.NewStorage(redisClient, "data-poster.queue", &initConfig.RedisSigner, encF)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -162,19 +168,22 @@ func (p *DataPoster) canPostWithNonce(ctx context.Context, nextNonce uint64) err
return nil
}

func (p *DataPoster) waitForL1Finality() bool {
return p.config().WaitForL1Finality && !p.headerReader.IsParentChainArbitrum()
}

// Requires the caller hold the mutex.
// Returns the next nonce, its metadata if stored, a bool indicating if the metadata is present, and an error.
// Unlike GetNextNonceAndMeta, this does not call the metadataRetriever if the metadata is not stored in the queue.
func (p *DataPoster) getNextNonceAndMaybeMeta(ctx context.Context) (uint64, []byte, bool, error) {
config := p.config()
// Ensure latest finalized block state is available.
blockNum, err := p.client.BlockNumber(ctx)
if err != nil {
return 0, nil, false, err
}
lastQueueItem, err := p.queue.FetchLast(ctx)
if err != nil {
return 0, nil, false, err
return 0, nil, false, fmt.Errorf("fetching last element from queue: %w", err)
}
if lastQueueItem != nil {
nextNonce := lastQueueItem.Data.Nonce + 1
Expand All @@ -185,7 +194,7 @@ func (p *DataPoster) getNextNonceAndMaybeMeta(ctx context.Context) (uint64, []by
}

if err := p.updateNonce(ctx); err != nil {
if !p.queue.IsPersistent() && config.WaitForL1Finality {
if !p.queue.IsPersistent() && p.waitForL1Finality() {
return 0, nil, false, fmt.Errorf("error getting latest finalized nonce (and queue is not persistent): %w", err)
}
// Fall back to using a recent block to get the nonce. This is safe because there's nothing in the queue.
Expand Down Expand Up @@ -364,7 +373,10 @@ func (p *DataPoster) saveTx(ctx context.Context, prevTx, newTx *storage.QueuedTr
if prevTx != nil && prevTx.Data.Nonce != newTx.Data.Nonce {
return fmt.Errorf("prevTx nonce %v doesn't match newTx nonce %v", prevTx.Data.Nonce, newTx.Data.Nonce)
}
return p.queue.Put(ctx, newTx.Data.Nonce, prevTx, newTx)
if err := p.queue.Put(ctx, newTx.Data.Nonce, prevTx, newTx); err != nil {
return fmt.Errorf("putting new tx in the queue: %w", err)
}
return nil
}

func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransaction, newTx *storage.QueuedTransaction) error {
Expand Down Expand Up @@ -433,7 +445,7 @@ func (p *DataPoster) replaceTx(ctx context.Context, prevTx *storage.QueuedTransa
// The mutex must be held by the caller.
func (p *DataPoster) updateNonce(ctx context.Context) error {
var blockNumQuery *big.Int
if p.config().WaitForL1Finality {
if p.waitForL1Finality() {
blockNumQuery = big.NewInt(int64(rpc.FinalizedBlockNumber))
}
header, err := p.client.HeaderByNumber(ctx, blockNumQuery)
Expand Down Expand Up @@ -546,7 +558,7 @@ func (p *DataPoster) Start(ctxIn context.Context) {
// replacing them by fee.
queueContents, err := p.queue.FetchContents(ctx, unconfirmedNonce, maxTxsToRbf)
if err != nil {
log.Error("Failed to get tx queue contents", "err", err)
log.Error("Failed to fetch tx queue contents", "err", err)
return minWait
}
for index, tx := range queueContents {
Expand Down Expand Up @@ -602,20 +614,23 @@ type QueueStorage interface {
}

type DataPosterConfig struct {
RedisSigner signature.SimpleHmacConfig `koanf:"redis-signer"`
ReplacementTimes string `koanf:"replacement-times"`
WaitForL1Finality bool `koanf:"wait-for-l1-finality" reload:"hot"`
MaxMempoolTransactions uint64 `koanf:"max-mempool-transactions" reload:"hot"`
MaxQueuedTransactions int `koanf:"max-queued-transactions" reload:"hot"`
TargetPriceGwei float64 `koanf:"target-price-gwei" reload:"hot"`
UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"`
MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"`
MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"`
MaxTipCapGwei float64 `koanf:"max-tip-cap-gwei" reload:"hot"`
NonceRbfSoftConfs uint64 `koanf:"nonce-rbf-soft-confs" reload:"hot"`
AllocateMempoolBalance bool `koanf:"allocate-mempool-balance" reload:"hot"`
UseLevelDB bool `koanf:"use-leveldb"`
UseNoOpStorage bool `koanf:"use-noop-storage"`
RedisSigner signature.SimpleHmacConfig `koanf:"redis-signer"`
ReplacementTimes string `koanf:"replacement-times"`
// This is forcibly disabled if the parent chain is an Arbitrum chain,
// so you should probably use DataPoster's waitForL1Finality method instead of reading this field directly.
WaitForL1Finality bool `koanf:"wait-for-l1-finality" reload:"hot"`
MaxMempoolTransactions uint64 `koanf:"max-mempool-transactions" reload:"hot"`
MaxQueuedTransactions int `koanf:"max-queued-transactions" reload:"hot"`
TargetPriceGwei float64 `koanf:"target-price-gwei" reload:"hot"`
UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"`
MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"`
MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"`
MaxTipCapGwei float64 `koanf:"max-tip-cap-gwei" reload:"hot"`
NonceRbfSoftConfs uint64 `koanf:"nonce-rbf-soft-confs" reload:"hot"`
AllocateMempoolBalance bool `koanf:"allocate-mempool-balance" reload:"hot"`
UseLevelDB bool `koanf:"use-leveldb"`
UseNoOpStorage bool `koanf:"use-noop-storage"`
LegacyStorageEncoding bool `koanf:"legacy-storage-encoding" reload:"hot"`
}

// ConfigFetcher function type is used instead of directly passing config so
Expand All @@ -636,6 +651,7 @@ func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".allocate-mempool-balance", DefaultDataPosterConfig.AllocateMempoolBalance, "if true, don't put transactions in the mempool that spend a total greater than the batch poster's balance")
f.Bool(prefix+".use-leveldb", DefaultDataPosterConfig.UseLevelDB, "uses leveldb when enabled")
f.Bool(prefix+".use-noop-storage", DefaultDataPosterConfig.UseNoOpStorage, "uses noop storage, it doesn't store anything")
f.Bool(prefix+".legacy-storage-encoding", DefaultDataPosterConfig.LegacyStorageEncoding, "encodes items in a legacy way (as it was before dropping generics)")
signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f)
}

Expand All @@ -651,6 +667,7 @@ var DefaultDataPosterConfig = DataPosterConfig{
AllocateMempoolBalance: true,
UseLevelDB: false,
UseNoOpStorage: false,
LegacyStorageEncoding: true,
}

var DefaultDataPosterConfigForValidator = func() DataPosterConfig {
Expand Down
26 changes: 9 additions & 17 deletions arbnode/dataposter/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/syndtr/goleveldb/leveldb"
)

// Storage implements leveldb based storage for batch poster.
type Storage struct {
db ethdb.Database
db ethdb.Database
encDec storage.EncoderDecoderF
}

var (
Expand All @@ -31,16 +31,8 @@ var (
countKey = []byte(".count_key")
)

func New(db ethdb.Database) *Storage {
return &Storage{db: db}
}

func (s *Storage) decodeItem(data []byte) (*storage.QueuedTransaction, error) {
var item storage.QueuedTransaction
if err := rlp.DecodeBytes(data, &item); err != nil {
return nil, fmt.Errorf("decoding item: %w", err)
}
return &item, nil
func New(db ethdb.Database, enc storage.EncoderDecoderF) *Storage {
return &Storage{db: db, encDec: enc}
}

func idxToKey(idx uint64) []byte {
Expand All @@ -55,7 +47,7 @@ func (s *Storage) FetchContents(_ context.Context, startingIndex uint64, maxResu
if !it.Next() {
break
}
item, err := s.decodeItem(it.Value())
item, err := s.encDec().Decode(it.Value())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -84,7 +76,7 @@ func (s *Storage) FetchLast(ctx context.Context) (*storage.QueuedTransaction, er
if err != nil {
return nil, err
}
return s.decodeItem(val)
return s.encDec().Decode(val)
}

func (s *Storage) Prune(ctx context.Context, until uint64) error {
Expand Down Expand Up @@ -117,7 +109,7 @@ func (s *Storage) valueAt(_ context.Context, key []byte) ([]byte, error) {
val, err := s.db.Get(key)
if err != nil {
if isErrNotFound(err) {
return rlp.EncodeToBytes((*storage.QueuedTransaction)(nil))
return s.encDec().Encode((*storage.QueuedTransaction)(nil))
}
return nil, err
}
Expand All @@ -130,14 +122,14 @@ func (s *Storage) Put(ctx context.Context, index uint64, prev, new *storage.Queu
if err != nil {
return err
}
prevEnc, err := rlp.EncodeToBytes(prev)
prevEnc, err := s.encDec().Encode(prev)
if err != nil {
return fmt.Errorf("encoding previous item: %w", err)
}
if !bytes.Equal(stored, prevEnc) {
return fmt.Errorf("replacing different item than expected at index: %v, stored: %v, prevEnc: %v", index, stored, prevEnc)
}
newEnc, err := rlp.EncodeToBytes(new)
newEnc, err := s.encDec().Encode(new)
if err != nil {
return fmt.Errorf("encoding new item: %w", err)
}
Expand Down
Loading

0 comments on commit cd13aa1

Please sign in to comment.