Skip to content

Commit

Permalink
Merge branch 'snap_sync' into snap_sync_v1
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi authored May 9, 2024
2 parents bc82da2 + edfa8f0 commit 74faac0
Show file tree
Hide file tree
Showing 76 changed files with 853 additions and 625 deletions.
2 changes: 1 addition & 1 deletion arbitrator/arbutil/src/evm/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub trait EvmApi<D: DataReader>: Send + 'static {
) -> (eyre::Result<Bytes20>, u32, u64);

/// Returns the EVM return data.
/// Analogous to `vm.RETURNDATA`.
/// Analogous to `vm.RETURNDATACOPY`.
fn get_return_data(&self) -> D;

/// Emits an EVM log with the given number of topics and data, the first bytes of which should be the topic data.
Expand Down
6 changes: 3 additions & 3 deletions arbitrator/prover/src/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,9 @@ impl<'a> WasmBinary<'a> {
ink_left: ink_left.as_u32(),
ink_status: ink_status.as_u32(),
depth_left: depth_left.as_u32(),
init_cost: init.try_into()?,
cached_init_cost: cached_init.try_into()?,
asm_estimate: asm_estimate.try_into()?,
init_cost: init.try_into().wrap_err("init cost too high")?,
cached_init_cost: cached_init.try_into().wrap_err("cached cost too high")?,
asm_estimate: asm_estimate.try_into().wrap_err("asm estimate too large")?,
footprint,
user_main,
})
Expand Down
2 changes: 1 addition & 1 deletion arbitrator/prover/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1729,7 +1729,7 @@ impl Machine {

pub fn jump_into_func(&mut self, module: u32, func: u32, mut args: Vec<Value>) -> Result<()> {
let Some(source_module) = self.modules.get(module as usize) else {
bail!("no module at offest {}", module.red())
bail!("no module at offset {}", module.red())
};
let Some(source_func) = source_module.funcs.get(func as usize) else {
bail!(
Expand Down
32 changes: 12 additions & 20 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ import (
"github.com/offchainlabs/nitro/arbnode/redislock"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util"
Expand Down Expand Up @@ -98,7 +98,7 @@ type BatchPoster struct {
bridgeAddr common.Address
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dapWriter daprovider.Writer
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
messagesPerBatch *arbmath.MovingAverage[uint64]
Expand Down Expand Up @@ -129,7 +129,7 @@ const (

type BatchPosterConfig struct {
Enable bool `koanf:"enable"`
DisableDasFallbackStoreDataOnChain bool `koanf:"disable-das-fallback-store-data-on-chain" reload:"hot"`
DisableDapFallbackStoreDataOnChain bool `koanf:"disable-dap-fallback-store-data-on-chain" reload:"hot"`
// Max batch size.
MaxSize int `koanf:"max-size" reload:"hot"`
// Maximum 4844 blob enabled batch size.
Expand Down Expand Up @@ -189,7 +189,7 @@ type BatchPosterConfigFetcher func() *BatchPosterConfig

func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBatchPosterConfig.Enable, "enable posting batches to l1")
f.Bool(prefix+".disable-das-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDasFallbackStoreDataOnChain, "If unable to batch to DAS, disable fallback storing data on chain")
f.Bool(prefix+".disable-dap-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDapFallbackStoreDataOnChain, "If unable to batch to DA provider, disable fallback storing data on chain")
f.Int(prefix+".max-size", DefaultBatchPosterConfig.MaxSize, "maximum batch size")
f.Int(prefix+".max-4844-batch-size", DefaultBatchPosterConfig.Max4844BatchSize, "maximum 4844 blob enabled batch size")
f.Duration(prefix+".max-delay", DefaultBatchPosterConfig.MaxDelay, "maximum batch posting delay")
Expand All @@ -214,7 +214,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {

var DefaultBatchPosterConfig = BatchPosterConfig{
Enable: false,
DisableDasFallbackStoreDataOnChain: false,
DisableDapFallbackStoreDataOnChain: false,
// This default is overridden for L3 chains in applyChainParameters in cmd/nitro/nitro.go
MaxSize: 100000,
Max4844BatchSize: blobs.BlobEncodableData*(params.MaxBlobGasPerBlock/params.BlobTxBlobGasPerBlob) - 2000,
Expand Down Expand Up @@ -277,7 +277,7 @@ type BatchPosterOpts struct {
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAWriter das.DataAvailabilityServiceWriter
DAPWriter daprovider.Writer
ParentChainID *big.Int
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
seqInboxAddr: opts.DeployInfo.SequencerInbox,
gasRefunderAddr: opts.Config().gasRefunder,
bridgeAddr: opts.DeployInfo.Bridge,
daWriter: opts.DAWriter,
dapWriter: opts.DAPWriter,
redisLock: redisLock,
}
b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20)
Expand Down Expand Up @@ -883,7 +883,7 @@ func (s *batchSegments) CloseAndGetBytes() ([]byte, error) {
}
compressedBytes := s.compressedBuffer.Bytes()
fullMsg := make([]byte, 1, len(compressedBytes)+1)
fullMsg[0] = arbstate.BrotliMessageHeaderByte
fullMsg[0] = daprovider.BrotliMessageHeaderByte
fullMsg = append(fullMsg, compressedBytes...)
return fullMsg, nil
}
Expand Down Expand Up @@ -1063,7 +1063,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}
var use4844 bool
config := b.config()
if config.Post4844Blobs && b.daWriter == nil && latestHeader.ExcessBlobGas != nil && latestHeader.BlobGasUsed != nil {
if config.Post4844Blobs && b.dapWriter == nil && latestHeader.ExcessBlobGas != nil && latestHeader.BlobGasUsed != nil {
arbOSVersion, err := b.arbOSVersionGetter.ArbOSVersionForMessageNumber(arbutil.MessageIndex(arbmath.SaturatingUSub(uint64(batchPosition.MessageCount), 1)))
if err != nil {
return false, err
Expand Down Expand Up @@ -1246,7 +1246,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
return false, nil
}

if b.daWriter != nil {
if b.dapWriter != nil {
if !b.redisLock.AttemptLock(ctx) {
return false, errAttemptLockFailed
}
Expand All @@ -1258,17 +1258,9 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
if nonce != gotNonce || !bytes.Equal(batchPositionBytes, gotMeta) {
return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce)
}

cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled
if errors.Is(err, das.BatchToDasFailed) {
if config.DisableDasFallbackStoreDataOnChain {
return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled")
}
log.Warn("Falling back to storing data on chain", "err", err)
} else if err != nil {
sequencerMsg, err = b.dapWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}, config.DisableDapFallbackStoreDataOnChain)
if err != nil {
return false, err
} else {
sequencerMsg = das.Serialize(cert)
}
}

Expand Down
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil, nil, DefaultSnapSyncConfig)
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
31 changes: 13 additions & 18 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
Expand All @@ -37,24 +38,18 @@ type InboxTracker struct {
txStreamer *TransactionStreamer
mutex sync.Mutex
validator *staker.BlockValidator
das arbstate.DataAvailabilityReader
blobReader arbstate.BlobReader
dapReaders []daprovider.Reader
snapSyncConfig SnapSyncConfig

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader, snapSyncConfig SnapSyncConfig) (*InboxTracker, error) {
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil {
return nil, errors.New("data availability service required but unconfigured")
}
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader, snapSyncConfig SnapSyncConfig) (*InboxTracker, error) {
tracker := &InboxTracker{
db: db,
txStreamer: txStreamer,
das: das,
blobReader: blobReader,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
snapSyncConfig: snapSyncConfig,
}
Expand Down Expand Up @@ -304,7 +299,14 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas
if err != nil {
return fmt.Errorf("error getting message %v: %w", seqNum, err)
}
feedMessage, err := broadcastServer.NewBroadcastFeedMessage(*message, seqNum)

msgResult, err := t.txStreamer.ResultAtCount(seqNum)
var blockHash *common.Hash
if err == nil {
blockHash = &msgResult.BlockHash
}

feedMessage, err := broadcastServer.NewBroadcastFeedMessage(*message, seqNum, blockHash)
if err != nil {
return fmt.Errorf("error creating broadcast feed message %v: %w", seqNum, err)
}
Expand Down Expand Up @@ -706,14 +708,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
ctx: ctx,
client: client,
}
var daProviders []arbstate.DataAvailabilityProvider
if t.das != nil {
daProviders = append(daProviders, arbstate.NewDAProviderDAS(t.das))
}
if t.blobReader != nil {
daProviders = append(daProviders, arbstate.NewDAProviderBlobReader(t.blobReader))
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, daProviders, arbstate.KeysetValidate)
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.dapReaders, daprovider.KeysetValidate)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)
currentpos := prevbatchmeta.MessageCount + 1
for {
Expand Down
34 changes: 24 additions & 10 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbnode/resourcemanager"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcastclients"
Expand Down Expand Up @@ -257,7 +257,7 @@ type Node struct {
L1Reader *headerreader.HeaderReader
TxStreamer *TransactionStreamer
DeployInfo *chaininfo.RollupAddresses
BlobReader arbstate.BlobReader
BlobReader daprovider.BlobReader
InboxReader *InboxReader
InboxTracker *InboxTracker
DelayedSequencer *DelayedSequencer
Expand Down Expand Up @@ -400,7 +400,7 @@ func createNodeImpl(
dataSigner signature.DataSignerFunc,
fatalErrChan chan error,
parentChainID *big.Int,
blobReader arbstate.BlobReader,
blobReader daprovider.BlobReader,
) (*Node, error) {
config := configFetcher.Get()

Expand Down Expand Up @@ -558,7 +558,18 @@ func createNodeImpl(
return nil, errors.New("a data availability service is required for this chain, but it was not configured")
}

inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader, config.SnapSync)
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && daReader == nil {
return nil, errors.New("data availability service required but unconfigured")
}
var dapReaders []daprovider.Reader
if daReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForDAS(daReader))
}
if blobReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(blobReader))
}
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders, config.SnapSync)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -594,8 +605,7 @@ func createNodeImpl(
txStreamer,
exec,
rawdb.NewTable(arbDb, storage.BlockValidatorPrefix),
daReader,
blobReader,
dapReaders,
func() *staker.BlockValidatorConfig { return &configFetcher.Get().BlockValidator },
stack,
)
Expand Down Expand Up @@ -702,6 +712,10 @@ func createNodeImpl(
if txOptsBatchPoster == nil && config.BatchPoster.DataPoster.ExternalSigner.URL == "" {
return nil, errors.New("batchposter, but no TxOpts")
}
var dapWriter daprovider.Writer
if daWriter != nil {
dapWriter = daprovider.NewWriterForDAS(daWriter)
}
batchPoster, err = NewBatchPoster(ctx, &BatchPosterOpts{
DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix),
L1Reader: l1Reader,
Expand All @@ -712,7 +726,7 @@ func createNodeImpl(
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAWriter: daWriter,
DAPWriter: dapWriter,
ParentChainID: parentChainID,
})
if err != nil {
Expand Down Expand Up @@ -809,7 +823,7 @@ func CreateNode(
dataSigner signature.DataSignerFunc,
fatalErrChan chan error,
parentChainID *big.Int,
blobReader arbstate.BlobReader,
blobReader daprovider.BlobReader,
) (*Node, error) {
currentNode, err := createNodeImpl(ctx, stack, exec, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader)
if err != nil {
Expand Down Expand Up @@ -1081,8 +1095,8 @@ func (n *Node) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex,
return n.InboxReader.GetFinalizedMsgCount(ctx)
}

func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta)
func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta, msgResult)
}

func (n *Node) ExpectChosenSequencer() error {
Expand Down
4 changes: 2 additions & 2 deletions arbnode/sequencer_inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"

"github.com/offchainlabs/nitro/solgen/go/bridgegen"
Expand Down Expand Up @@ -159,7 +159,7 @@ func (m *SequencerInboxBatch) getSequencerData(ctx context.Context, client arbut
if len(tx.BlobHashes()) == 0 {
return nil, fmt.Errorf("blob batch transaction %v has no blobs", tx.Hash())
}
data := []byte{arbstate.BlobHashesHeaderFlag}
data := []byte{daprovider.BlobHashesHeaderFlag}
for _, h := range tx.BlobHashes() {
data = append(data, h[:]...)
}
Expand Down
Loading

0 comments on commit 74faac0

Please sign in to comment.