diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index 699eb3e8f6..9ad984ae6c 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) { defer cancel() exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{}) - tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig) + tracker, err := NewInboxTracker(db, streamer, nil) Require(t, err) err = streamer.Start(ctx) diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index 3ba9aa78f3..6286bff385 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -32,6 +32,9 @@ type InboxReaderConfig struct { TargetMessagesRead uint64 `koanf:"target-messages-read" reload:"hot"` MaxBlocksToRead uint64 `koanf:"max-blocks-to-read" reload:"hot"` ReadMode string `koanf:"read-mode" reload:"hot"` + EnableSnapSync bool `koanf:"enable-snap-sync"` + // SnapSyncTest is only used for testing purposes, these should not be configured in production. + SnapSyncTest SnapSyncConfig } type InboxReaderConfigFetcher func() *InboxReaderConfig @@ -56,6 +59,7 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once") f.Uint64(prefix+".max-blocks-to-read", DefaultInboxReaderConfig.MaxBlocksToRead, "if adjust-blocks-to-read is enabled, the maximum number of blocks to read at once") f.String(prefix+".read-mode", DefaultInboxReaderConfig.ReadMode, "mode to only read latest or safe or finalized L1 blocks. Enabling safe or finalized disables feed input and output. Defaults to latest. Takes string input, valid strings- latest, safe, finalized") + f.Bool(prefix+".enable-snap-sync", false, "enable snap sync") } var DefaultInboxReaderConfig = InboxReaderConfig{ @@ -95,13 +99,14 @@ type InboxReader struct { caughtUpChan chan struct{} client arbutil.L1Interface l1Reader *headerreader.HeaderReader + rollupAddress common.Address // Atomic lastSeenBatchCount uint64 lastReadBatchCount uint64 } -func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) { +func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, rollupAddress common.Address, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) { err := config().Validate() if err != nil { return nil, err @@ -115,12 +120,40 @@ func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, l1Reader firstMessageBlock: firstMessageBlock, caughtUpChan: make(chan struct{}), config: config, + rollupAddress: rollupAddress, }, nil } func (r *InboxReader) Start(ctxIn context.Context) error { r.StopWaiter.Start(ctxIn, r) hadError := false + if r.config().EnableSnapSync { + batchCount, err := r.tracker.GetBatchCount() + if err != nil { + return err + } + if batchCount == 0 { + snapSyncConfig := r.fetchSnapSyncParameters() + r.tracker.SetSnapSyncParameters(snapSyncConfig) + r.tracker.txStreamer.SetSnapSyncParameters(snapSyncConfig) + firstMessageToRead := snapSyncConfig.DelayedCount + if firstMessageToRead > snapSyncConfig.BatchCount { + firstMessageToRead = snapSyncConfig.BatchCount + } + if firstMessageToRead > 0 { + firstMessageToRead-- + } + // Find the first block containing the first message to read + // Subtract 1 to get the block before the first message to read, + // this is done to fetch previous batch metadata needed for snap sync. + block, err := FindBlockContainingBatch(ctxIn, r.rollupAddress, r.client, snapSyncConfig.ParentChainAssertionBlock, firstMessageToRead-1) + if err != nil { + return err + } + r.firstMessageBlock.SetUint64(block) + } + + } r.CallIteratively(func(ctx context.Context) time.Duration { err := r.run(ctx, hadError) if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") { @@ -139,7 +172,7 @@ func (r *InboxReader) Start(ctxIn context.Context) error { return err } if batchCount > 0 { - if r.tracker.snapSyncConfig.Enabled { + if r.config().EnableSnapSync { break } // Validate the init message matches our L2 blockchain @@ -172,6 +205,12 @@ func (r *InboxReader) Start(ctxIn context.Context) error { return nil } +func (r *InboxReader) fetchSnapSyncParameters() SnapSyncConfig { + // In the future, we will implement a way to fetch this is from other nodes, + // but for now we will just use the test config + return r.config().SnapSyncTest +} + // assumes l1block is recent so we could do a simple-search from the end func (r *InboxReader) recentParentChainBlockToMsg(ctx context.Context, parentChainBlock uint64) (arbutil.MessageIndex, error) { batch, err := r.tracker.GetBatchCount() diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index 259a25f439..5c879743a4 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -66,7 +66,7 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (* Fail(t, err) } execSeq := &execClientWrapper{execEngine, t} - inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher, &DefaultSnapSyncConfig) + inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher) if err != nil { Fail(t, err) } diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 1aa0ff7687..10ff5b826f 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -45,13 +45,12 @@ type InboxTracker struct { batchMeta *containers.LruCache[uint64, BatchMetadata] } -func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader, snapSyncConfig SnapSyncConfig) (*InboxTracker, error) { +func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader) (*InboxTracker, error) { tracker := &InboxTracker{ - db: db, - txStreamer: txStreamer, - dapReaders: dapReaders, - batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000), - snapSyncConfig: snapSyncConfig, + db: db, + txStreamer: txStreamer, + dapReaders: dapReaders, + batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000), } return tracker, nil } @@ -225,6 +224,10 @@ func (t *InboxTracker) GetBatchCount() (uint64, error) { return count, nil } +func (t *InboxTracker) SetSnapSyncParameters(config SnapSyncConfig) { + t.snapSyncConfig = config +} + // err will return unexpected/internal errors // bool will be false if batch not found (meaning, block not yet posted on a batch) func (t *InboxTracker) FindInboxBatchContainingMessage(pos arbutil.MessageIndex) (uint64, bool, error) { diff --git a/arbnode/node.go b/arbnode/node.go index b4321f7767..c5ba0f4f91 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -93,8 +93,6 @@ type Config struct { TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"` Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"` ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` - // SnapSyncConfig is only used for testing purposes, these should not be configured in production. - SnapSyncTest SnapSyncConfig } func (c *Config) Validate() error { @@ -177,7 +175,6 @@ var ConfigDefault = Config{ TransactionStreamer: DefaultTransactionStreamerConfig, ResourceMgmt: resourcemanager.DefaultConfig, Maintenance: DefaultMaintenanceConfig, - SnapSyncTest: DefaultSnapSyncConfig, } func ConfigDefaultL1Test() *Config { @@ -285,15 +282,6 @@ type SnapSyncConfig struct { ParentChainAssertionBlock uint64 } -var DefaultSnapSyncConfig = SnapSyncConfig{ - Enabled: false, - PrevBatchMessageCount: 0, - PrevDelayedRead: 0, - BatchCount: 0, - DelayedCount: 0, - ParentChainAssertionBlock: 0, -} - type ConfigFetcher interface { Get() *Config Start(context.Context) @@ -431,7 +419,7 @@ func createNodeImpl( } transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer } - txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest) + txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher) if err != nil { return nil, err } @@ -561,29 +549,11 @@ func createNodeImpl( if blobReader != nil { dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(blobReader)) } - inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders, config.SnapSyncTest) + inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders) if err != nil { return nil, err } - firstMessageBlock := new(big.Int).SetUint64(deployInfo.DeployedAt) - if config.SnapSyncTest.Enabled { - firstMessageToRead := config.SnapSyncTest.DelayedCount - if firstMessageToRead > config.SnapSyncTest.BatchCount { - firstMessageToRead = config.SnapSyncTest.BatchCount - } - if firstMessageToRead > 0 { - firstMessageToRead-- - } - // Find the first block containing the first message to read - // Subtract 1 to get the block before the first message to read, - // this is done to fetch previous batch metadata needed for snap sync. - block, err := FindBlockContainingBatch(ctx, deployInfo.Rollup, l1client, config.SnapSyncTest.ParentChainAssertionBlock, firstMessageToRead-1) - if err != nil { - return nil, err - } - firstMessageBlock.SetUint64(block) - } - inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, firstMessageBlock, delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader }) + inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, new(big.Int).SetUint64(deployInfo.DeployedAt), deployInfo.Rollup, delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader }) if err != nil { return nil, err } diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index c948bd8169..279aecdbd0 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -53,7 +53,7 @@ type TransactionStreamer struct { db ethdb.Database fatalErrChan chan<- error config TransactionStreamerConfigFetcher - snapSyncConfig *SnapSyncConfig + snapSyncConfig SnapSyncConfig insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held reorgMutex sync.RWMutex @@ -107,7 +107,6 @@ func NewTransactionStreamer( broadcastServer *broadcaster.Broadcaster, fatalErrChan chan<- error, config TransactionStreamerConfigFetcher, - snapSyncConfig *SnapSyncConfig, ) (*TransactionStreamer, error) { streamer := &TransactionStreamer{ exec: exec, @@ -117,7 +116,6 @@ func NewTransactionStreamer( broadcastServer: broadcastServer, fatalErrChan: fatalErrChan, config: config, - snapSyncConfig: snapSyncConfig, cachedL1PriceData: &L1PriceData{ msgToL1PriceData: []L1PriceDataOfMsg{}, }, @@ -795,6 +793,10 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m return s.addMessagesAndEndBatchImpl(pos, messagesAreConfirmed, messagesWithBlockHash, batch) } +func (s *TransactionStreamer) SetSnapSyncParameters(config SnapSyncConfig) { + s.snapSyncConfig = config +} + func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) (uint64, error) { if s.snapSyncConfig.Enabled && uint64(pos) == s.snapSyncConfig.PrevBatchMessageCount { return s.snapSyncConfig.PrevDelayedRead, nil diff --git a/cmd/pruning/pruning.go b/cmd/pruning/pruning.go index e011af9019..3ef888e897 100644 --- a/cmd/pruning/pruning.go +++ b/cmd/pruning/pruning.go @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node return nil, fmt.Errorf("failed to get finalized block: %w", err) } l1BlockNum := l1Block.NumberU64() - tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, arbnode.DefaultSnapSyncConfig) + tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil) if err != nil { return nil, err } diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index a0a349a1d9..0fb1e8fee0 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -77,11 +77,12 @@ func TestSnapSync(t *testing.T) { Require(t, err) // Create a config with snap sync enabled and same database directory as the 2nd node nodeConfig := builder.nodeConfig - nodeConfig.SnapSyncTest.Enabled = true - nodeConfig.SnapSyncTest.BatchCount = batchCount - nodeConfig.SnapSyncTest.DelayedCount = prevBatchMetaData.DelayedMessageCount - 1 - nodeConfig.SnapSyncTest.PrevDelayedRead = prevMessage.DelayedMessagesRead - nodeConfig.SnapSyncTest.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount) + nodeConfig.InboxReader.EnableSnapSync = true + nodeConfig.InboxReader.SnapSyncTest.Enabled = true + nodeConfig.InboxReader.SnapSyncTest.BatchCount = batchCount + nodeConfig.InboxReader.SnapSyncTest.DelayedCount = prevBatchMetaData.DelayedMessageCount - 1 + nodeConfig.InboxReader.SnapSyncTest.PrevDelayedRead = prevMessage.DelayedMessagesRead + nodeConfig.InboxReader.SnapSyncTest.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount) // Cleanup the message data of 2nd node, but keep the block state data. // This is to simulate a snap sync environment where we’ve just gotten the block state but don’t have any messages. err = os.RemoveAll(nodeB.ConsensusNode.Stack.ResolvePath("arbitrumdata"))