Skip to content

Commit

Permalink
Merge branch 'master' into improve-blobsyncing-errors
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuacolvin0 authored May 13, 2024
2 parents 05092df + 51189e4 commit 05ca225
Show file tree
Hide file tree
Showing 82 changed files with 969 additions and 648 deletions.
2 changes: 1 addition & 1 deletion arbitrator/arbutil/src/evm/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub trait EvmApi<D: DataReader>: Send + 'static {
) -> (eyre::Result<Bytes20>, u32, u64);

/// Returns the EVM return data.
/// Analogous to `vm.RETURNDATA`.
/// Analogous to `vm.RETURNDATACOPY`.
fn get_return_data(&self) -> D;

/// Emits an EVM log with the given number of topics and data, the first bytes of which should be the topic data.
Expand Down
6 changes: 3 additions & 3 deletions arbitrator/prover/src/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,9 @@ impl<'a> WasmBinary<'a> {
ink_left: ink_left.as_u32(),
ink_status: ink_status.as_u32(),
depth_left: depth_left.as_u32(),
init_cost: init.try_into()?,
cached_init_cost: cached_init.try_into()?,
asm_estimate: asm_estimate.try_into()?,
init_cost: init.try_into().wrap_err("init cost too high")?,
cached_init_cost: cached_init.try_into().wrap_err("cached cost too high")?,
asm_estimate: asm_estimate.try_into().wrap_err("asm estimate too large")?,
footprint,
user_main,
})
Expand Down
2 changes: 1 addition & 1 deletion arbitrator/prover/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1729,7 +1729,7 @@ impl Machine {

pub fn jump_into_func(&mut self, module: u32, func: u32, mut args: Vec<Value>) -> Result<()> {
let Some(source_module) = self.modules.get(module as usize) else {
bail!("no module at offest {}", module.red())
bail!("no module at offset {}", module.red())
};
let Some(source_func) = source_module.funcs.get(func as usize) else {
bail!(
Expand Down
32 changes: 12 additions & 20 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ import (
"github.com/offchainlabs/nitro/arbnode/redislock"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util"
Expand Down Expand Up @@ -98,7 +98,7 @@ type BatchPoster struct {
bridgeAddr common.Address
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dapWriter daprovider.Writer
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
messagesPerBatch *arbmath.MovingAverage[uint64]
Expand Down Expand Up @@ -129,7 +129,7 @@ const (

type BatchPosterConfig struct {
Enable bool `koanf:"enable"`
DisableDasFallbackStoreDataOnChain bool `koanf:"disable-das-fallback-store-data-on-chain" reload:"hot"`
DisableDapFallbackStoreDataOnChain bool `koanf:"disable-dap-fallback-store-data-on-chain" reload:"hot"`
// Max batch size.
MaxSize int `koanf:"max-size" reload:"hot"`
// Maximum 4844 blob enabled batch size.
Expand Down Expand Up @@ -189,7 +189,7 @@ type BatchPosterConfigFetcher func() *BatchPosterConfig

func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBatchPosterConfig.Enable, "enable posting batches to l1")
f.Bool(prefix+".disable-das-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDasFallbackStoreDataOnChain, "If unable to batch to DAS, disable fallback storing data on chain")
f.Bool(prefix+".disable-dap-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDapFallbackStoreDataOnChain, "If unable to batch to DA provider, disable fallback storing data on chain")
f.Int(prefix+".max-size", DefaultBatchPosterConfig.MaxSize, "maximum batch size")
f.Int(prefix+".max-4844-batch-size", DefaultBatchPosterConfig.Max4844BatchSize, "maximum 4844 blob enabled batch size")
f.Duration(prefix+".max-delay", DefaultBatchPosterConfig.MaxDelay, "maximum batch posting delay")
Expand All @@ -214,7 +214,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {

var DefaultBatchPosterConfig = BatchPosterConfig{
Enable: false,
DisableDasFallbackStoreDataOnChain: false,
DisableDapFallbackStoreDataOnChain: false,
// This default is overridden for L3 chains in applyChainParameters in cmd/nitro/nitro.go
MaxSize: 100000,
Max4844BatchSize: blobs.BlobEncodableData*(params.MaxBlobGasPerBlock/params.BlobTxBlobGasPerBlob) - 2000,
Expand Down Expand Up @@ -277,7 +277,7 @@ type BatchPosterOpts struct {
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAWriter das.DataAvailabilityServiceWriter
DAPWriter daprovider.Writer
ParentChainID *big.Int
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
seqInboxAddr: opts.DeployInfo.SequencerInbox,
gasRefunderAddr: opts.Config().gasRefunder,
bridgeAddr: opts.DeployInfo.Bridge,
daWriter: opts.DAWriter,
dapWriter: opts.DAPWriter,
redisLock: redisLock,
}
b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20)
Expand Down Expand Up @@ -883,7 +883,7 @@ func (s *batchSegments) CloseAndGetBytes() ([]byte, error) {
}
compressedBytes := s.compressedBuffer.Bytes()
fullMsg := make([]byte, 1, len(compressedBytes)+1)
fullMsg[0] = arbstate.BrotliMessageHeaderByte
fullMsg[0] = daprovider.BrotliMessageHeaderByte
fullMsg = append(fullMsg, compressedBytes...)
return fullMsg, nil
}
Expand Down Expand Up @@ -1063,7 +1063,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}
var use4844 bool
config := b.config()
if config.Post4844Blobs && b.daWriter == nil && latestHeader.ExcessBlobGas != nil && latestHeader.BlobGasUsed != nil {
if config.Post4844Blobs && b.dapWriter == nil && latestHeader.ExcessBlobGas != nil && latestHeader.BlobGasUsed != nil {
arbOSVersion, err := b.arbOSVersionGetter.ArbOSVersionForMessageNumber(arbutil.MessageIndex(arbmath.SaturatingUSub(uint64(batchPosition.MessageCount), 1)))
if err != nil {
return false, err
Expand Down Expand Up @@ -1246,7 +1246,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
return false, nil
}

if b.daWriter != nil {
if b.dapWriter != nil {
if !b.redisLock.AttemptLock(ctx) {
return false, errAttemptLockFailed
}
Expand All @@ -1258,17 +1258,9 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
if nonce != gotNonce || !bytes.Equal(batchPositionBytes, gotMeta) {
return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce)
}

cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled
if errors.Is(err, das.BatchToDasFailed) {
if config.DisableDasFallbackStoreDataOnChain {
return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled")
}
log.Warn("Falling back to storing data on chain", "err", err)
} else if err != nil {
sequencerMsg, err = b.dapWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}, config.DisableDapFallbackStoreDataOnChain)
if err != nil {
return false, err
} else {
sequencerMsg = das.Serialize(cert)
}
}

Expand Down
46 changes: 29 additions & 17 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func (p *DataPoster) feeAndTipCaps(ctx context.Context, nonce uint64, gasLimit u
targetBlobCost := arbmath.BigMulByUint(newBlobFeeCap, blobGasUsed)
targetNonBlobCost := arbmath.BigSub(targetMaxCost, targetBlobCost)
newBaseFeeCap := arbmath.BigDivByUint(targetNonBlobCost, gasLimit)
if lastTx != nil && numBlobs > 0 && arbmath.BigDivToBips(newBaseFeeCap, lastTx.GasFeeCap()) < minRbfIncrease {
if lastTx != nil && numBlobs > 0 && lastTx.GasFeeCap().Sign() > 0 && arbmath.BigDivToBips(newBaseFeeCap, lastTx.GasFeeCap()) < minRbfIncrease {
// Increase the non-blob fee cap to the minimum rbf increase
newBaseFeeCap = arbmath.BigMulByBips(lastTx.GasFeeCap(), minRbfIncrease)
newNonBlobCost := arbmath.BigMulByUint(newBaseFeeCap, gasLimit)
Expand Down Expand Up @@ -665,6 +665,14 @@ func (p *DataPoster) feeAndTipCaps(ctx context.Context, nonce uint64, gasLimit u
return lastTx.GasFeeCap(), lastTx.GasTipCap(), lastTx.BlobGasFeeCap(), nil
}

// Ensure we bid at least 1 wei to prevent division by zero
if newBaseFeeCap.Sign() == 0 {
newBaseFeeCap = big.NewInt(1)
}
if newBlobFeeCap.Sign() == 0 {
newBlobFeeCap = big.NewInt(1)
}

return newBaseFeeCap, newTipCap, newBlobFeeCap, nil
}

Expand Down Expand Up @@ -844,21 +852,25 @@ func (p *DataPoster) sendTx(ctx context.Context, prevTx *storage.QueuedTransacti
if err != nil {
return fmt.Errorf("couldn't get preceding tx in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err)
}
if precedingTx != nil && // precedingTx == nil -> the actual preceding tx was already confirmed
(precedingTx.FullTx.Type() != newTx.FullTx.Type() || !precedingTx.Sent) {
latestBlockNumber, err := p.client.BlockNumber(ctx)
if err != nil {
return fmt.Errorf("couldn't get block number in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err)
}
prevBlockNumber := arbmath.SaturatingUSub(latestBlockNumber, 1)
reorgResistantNonce, err := p.client.NonceAt(ctx, p.Sender(), new(big.Int).SetUint64(prevBlockNumber))
if err != nil {
return fmt.Errorf("couldn't determine reorg resistant nonce in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err)
}
if precedingTx != nil { // precedingTx == nil -> the actual preceding tx was already confirmed
var latestBlockNumber, prevBlockNumber, reorgResistantNonce uint64
if precedingTx.FullTx.Type() != newTx.FullTx.Type() || !precedingTx.Sent {
latestBlockNumber, err = p.client.BlockNumber(ctx)
if err != nil {
return fmt.Errorf("couldn't get block number in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err)
}
prevBlockNumber = arbmath.SaturatingUSub(latestBlockNumber, 1)
reorgResistantNonce, err = p.client.NonceAt(ctx, p.Sender(), new(big.Int).SetUint64(prevBlockNumber))
if err != nil {
return fmt.Errorf("couldn't determine reorg resistant nonce in DataPoster to check if should send tx with nonce %d: %w", newTx.FullTx.Nonce(), err)
}

if precedingTx.FullTx.Nonce() > reorgResistantNonce {
log.Info("DataPoster is avoiding creating a mempool nonce gap (the tx remains queued and will be retried)", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent)
return nil
if precedingTx.FullTx.Nonce() > reorgResistantNonce {
log.Info("DataPoster is avoiding creating a mempool nonce gap (the tx remains queued and will be retried)", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent)
return nil
}
} else {
log.Info("DataPoster will send previously unsent batch tx", "nonce", newTx.FullTx.Nonce(), "prevType", precedingTx.FullTx.Type(), "type", newTx.FullTx.Type(), "prevSent", precedingTx.Sent, "latestBlockNumber", latestBlockNumber, "prevBlockNumber", prevBlockNumber, "reorgResistantNonce", reorgResistantNonce)
}
}
}
Expand Down Expand Up @@ -930,8 +942,8 @@ func (p *DataPoster) replaceTx(ctx context.Context, prevTx *storage.QueuedTransa
}

newTx := *prevTx
if arbmath.BigDivToBips(newFeeCap, prevTx.FullTx.GasFeeCap()) < minRbfIncrease ||
(prevTx.FullTx.BlobGasFeeCap() != nil && arbmath.BigDivToBips(newBlobFeeCap, prevTx.FullTx.BlobGasFeeCap()) < minRbfIncrease) {
if (prevTx.FullTx.GasFeeCap().Sign() > 0 && arbmath.BigDivToBips(newFeeCap, prevTx.FullTx.GasFeeCap()) < minRbfIncrease) ||
(prevTx.FullTx.BlobGasFeeCap() != nil && prevTx.FullTx.BlobGasFeeCap().Sign() > 0 && arbmath.BigDivToBips(newBlobFeeCap, prevTx.FullTx.BlobGasFeeCap()) < minRbfIncrease) {
log.Debug(
"no need to replace by fee transaction",
"nonce", prevTx.FullTx.Nonce(),
Expand Down
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil, nil)
tracker, err := NewInboxTracker(db, streamer, nil)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
31 changes: 13 additions & 18 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
Expand All @@ -37,23 +38,17 @@ type InboxTracker struct {
txStreamer *TransactionStreamer
mutex sync.Mutex
validator *staker.BlockValidator
das arbstate.DataAvailabilityReader
blobReader arbstate.BlobReader
dapReaders []daprovider.Reader

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader) (*InboxTracker, error) {
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil {
return nil, errors.New("data availability service required but unconfigured")
}
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader) (*InboxTracker, error) {
tracker := &InboxTracker{
db: db,
txStreamer: txStreamer,
das: das,
blobReader: blobReader,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
}
return tracker, nil
Expand Down Expand Up @@ -302,7 +297,14 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas
if err != nil {
return fmt.Errorf("error getting message %v: %w", seqNum, err)
}
feedMessage, err := broadcastServer.NewBroadcastFeedMessage(*message, seqNum)

msgResult, err := t.txStreamer.ResultAtCount(seqNum)
var blockHash *common.Hash
if err == nil {
blockHash = &msgResult.BlockHash
}

feedMessage, err := broadcastServer.NewBroadcastFeedMessage(*message, seqNum, blockHash)
if err != nil {
return fmt.Errorf("error creating broadcast feed message %v: %w", seqNum, err)
}
Expand Down Expand Up @@ -659,14 +661,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
ctx: ctx,
client: client,
}
var daProviders []arbstate.DataAvailabilityProvider
if t.das != nil {
daProviders = append(daProviders, arbstate.NewDAProviderDAS(t.das))
}
if t.blobReader != nil {
daProviders = append(daProviders, arbstate.NewDAProviderBlobReader(t.blobReader))
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, daProviders, arbstate.KeysetValidate)
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.dapReaders, daprovider.KeysetValidate)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)
currentpos := prevbatchmeta.MessageCount + 1
for {
Expand Down
34 changes: 24 additions & 10 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbnode/resourcemanager"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcastclients"
Expand Down Expand Up @@ -254,7 +254,7 @@ type Node struct {
L1Reader *headerreader.HeaderReader
TxStreamer *TransactionStreamer
DeployInfo *chaininfo.RollupAddresses
BlobReader arbstate.BlobReader
BlobReader daprovider.BlobReader
InboxReader *InboxReader
InboxTracker *InboxTracker
DelayedSequencer *DelayedSequencer
Expand Down Expand Up @@ -372,7 +372,7 @@ func createNodeImpl(
dataSigner signature.DataSignerFunc,
fatalErrChan chan error,
parentChainID *big.Int,
blobReader arbstate.BlobReader,
blobReader daprovider.BlobReader,
) (*Node, error) {
config := configFetcher.Get()

Expand Down Expand Up @@ -529,7 +529,18 @@ func createNodeImpl(
return nil, errors.New("a data availability service is required for this chain, but it was not configured")
}

inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader)
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && daReader == nil {
return nil, errors.New("data availability service required but unconfigured")
}
var dapReaders []daprovider.Reader
if daReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForDAS(daReader))
}
if blobReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(blobReader))
}
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders)
if err != nil {
return nil, err
}
Expand All @@ -547,8 +558,7 @@ func createNodeImpl(
txStreamer,
exec,
rawdb.NewTable(arbDb, storage.BlockValidatorPrefix),
daReader,
blobReader,
dapReaders,
func() *staker.BlockValidatorConfig { return &configFetcher.Get().BlockValidator },
stack,
)
Expand Down Expand Up @@ -655,6 +665,10 @@ func createNodeImpl(
if txOptsBatchPoster == nil && config.BatchPoster.DataPoster.ExternalSigner.URL == "" {
return nil, errors.New("batchposter, but no TxOpts")
}
var dapWriter daprovider.Writer
if daWriter != nil {
dapWriter = daprovider.NewWriterForDAS(daWriter)
}
batchPoster, err = NewBatchPoster(ctx, &BatchPosterOpts{
DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix),
L1Reader: l1Reader,
Expand All @@ -665,7 +679,7 @@ func createNodeImpl(
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAWriter: daWriter,
DAPWriter: dapWriter,
ParentChainID: parentChainID,
})
if err != nil {
Expand Down Expand Up @@ -725,7 +739,7 @@ func CreateNode(
dataSigner signature.DataSignerFunc,
fatalErrChan chan error,
parentChainID *big.Int,
blobReader arbstate.BlobReader,
blobReader daprovider.BlobReader,
) (*Node, error) {
currentNode, err := createNodeImpl(ctx, stack, exec, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader)
if err != nil {
Expand Down Expand Up @@ -997,8 +1011,8 @@ func (n *Node) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex,
return n.InboxReader.GetFinalizedMsgCount(ctx)
}

func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta)
func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta, msgResult)
}

func (n *Node) ExpectChosenSequencer() error {
Expand Down
Loading

0 comments on commit 05ca225

Please sign in to comment.