From 1588fb7794d907a9ffb8d821c27bfeb68e79596a Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 30 Apr 2024 18:56:39 +0530 Subject: [PATCH 01/21] V0 Snap Sync --- arbnode/delayed_seq_reorg_test.go | 2 +- arbnode/inbox_reader.go | 7 +++ arbnode/inbox_tracker.go | 77 +++++++++++++++++++++++-------- arbnode/node.go | 2 +- cmd/pruning/pruning.go | 2 +- system_tests/common_test.go | 2 +- system_tests/sync_test.go | 75 ++++++++++++++++++++++++++++++ 7 files changed, 145 insertions(+), 22 deletions(-) create mode 100644 system_tests/sync_test.go diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index beb2656e2b..8572673fba 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) { defer cancel() exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{}) - tracker, err := NewInboxTracker(db, streamer, nil, nil) + tracker, err := NewInboxTracker(db, streamer, nil, nil, 0) Require(t, err) err = streamer.Start(ctx) diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index a1f1a1a930..2d3afbbb3a 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -32,6 +32,7 @@ type InboxReaderConfig struct { TargetMessagesRead uint64 `koanf:"target-messages-read" reload:"hot"` MaxBlocksToRead uint64 `koanf:"max-blocks-to-read" reload:"hot"` ReadMode string `koanf:"read-mode" reload:"hot"` + FirstBatchToKeep uint64 `koanf:"first-batch-to-keep" reload:"hot"` } type InboxReaderConfigFetcher func() *InboxReaderConfig @@ -56,6 +57,7 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once") f.Uint64(prefix+".max-blocks-to-read", DefaultInboxReaderConfig.MaxBlocksToRead, "if adjust-blocks-to-read is enabled, the maximum number of blocks to read at once") f.String(prefix+".read-mode", DefaultInboxReaderConfig.ReadMode, "mode to only read latest or safe or finalized L1 blocks. Enabling safe or finalized disables feed input and output. Defaults to latest. Takes string input, valid strings- latest, safe, finalized") + f.Uint64(prefix+".first-batch-to-keep", DefaultInboxReaderConfig.FirstBatchToKeep, "the first batch to keep in the database while adding messages") } var DefaultInboxReaderConfig = InboxReaderConfig{ @@ -67,6 +69,7 @@ var DefaultInboxReaderConfig = InboxReaderConfig{ TargetMessagesRead: 500, MaxBlocksToRead: 2000, ReadMode: "latest", + FirstBatchToKeep: 0, } var TestInboxReaderConfig = InboxReaderConfig{ @@ -78,6 +81,7 @@ var TestInboxReaderConfig = InboxReaderConfig{ TargetMessagesRead: 500, MaxBlocksToRead: 2000, ReadMode: "latest", + FirstBatchToKeep: 0, } type InboxReader struct { @@ -139,6 +143,9 @@ func (r *InboxReader) Start(ctxIn context.Context) error { return err } if batchCount > 0 { + if r.tracker.GetFirstBatchToKeep() != 0 { + break + } // Validate the init message matches our L2 blockchain message, err := r.tracker.GetDelayedMessage(0) if err != nil { diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index b758e95e62..8341ed0e93 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -33,28 +33,30 @@ var ( ) type InboxTracker struct { - db ethdb.Database - txStreamer *TransactionStreamer - mutex sync.Mutex - validator *staker.BlockValidator - das arbstate.DataAvailabilityReader - blobReader arbstate.BlobReader + db ethdb.Database + txStreamer *TransactionStreamer + mutex sync.Mutex + validator *staker.BlockValidator + das arbstate.DataAvailabilityReader + blobReader arbstate.BlobReader + firstBatchToKeep uint64 batchMetaMutex sync.Mutex batchMeta *containers.LruCache[uint64, BatchMetadata] } -func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader) (*InboxTracker, error) { +func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader, firstBatchToKeep uint64) (*InboxTracker, error) { // We support a nil txStreamer for the pruning code if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil { return nil, errors.New("data availability service required but unconfigured") } tracker := &InboxTracker{ - db: db, - txStreamer: txStreamer, - das: das, - blobReader: blobReader, - batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000), + db: db, + txStreamer: txStreamer, + das: das, + blobReader: blobReader, + batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000), + firstBatchToKeep: firstBatchToKeep, } return tracker, nil } @@ -209,6 +211,10 @@ func (t *InboxTracker) GetBatchParentChainBlock(seqNum uint64) (uint64, error) { return metadata.ParentChainBlock, err } +func (t *InboxTracker) GetFirstBatchToKeep() uint64 { + return t.firstBatchToKeep +} + // GetBatchAcc is a convenience function wrapping GetBatchMetadata func (t *InboxTracker) GetBatchAcc(seqNum uint64) (common.Hash, error) { metadata, err := t.GetBatchMetadata(seqNum) @@ -383,6 +389,21 @@ func (t *InboxTracker) GetDelayedMessageBytes(seqNum uint64) ([]byte, error) { } func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardReorg bool) error { + var nextAcc common.Hash + for len(messages) > 0 { + pos, err := messages[0].Message.Header.SeqNum() + if err != nil { + return err + } + if pos+1 == t.firstBatchToKeep { + nextAcc = messages[0].AfterInboxAcc() + } + if pos < t.firstBatchToKeep { + messages = messages[1:] + } else { + break + } + } if len(messages) == 0 { return nil } @@ -407,8 +428,7 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR } } - var nextAcc common.Hash - if pos > 0 { + if pos > t.firstBatchToKeep { var err error nextAcc, err = t.GetDelayedAcc(pos - 1) if err != nil { @@ -596,6 +616,28 @@ func (b *multiplexerBackend) ReadDelayedInbox(seqNum uint64) (*arbostypes.L1Inco var delayedMessagesMismatch = errors.New("sequencer batch delayed messages missing or different") func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L1Interface, batches []*SequencerInboxBatch) error { + var nextAcc common.Hash + var prevbatchmeta BatchMetadata + sequenceNumberToKeep := t.firstBatchToKeep + if sequenceNumberToKeep > 0 { + sequenceNumberToKeep-- + } + for len(batches) > 0 { + if batches[0].SequenceNumber+1 == sequenceNumberToKeep { + nextAcc = batches[0].AfterInboxAcc + prevbatchmeta = BatchMetadata{ + Accumulator: batches[0].AfterInboxAcc, + DelayedMessageCount: batches[0].AfterDelayedCount, + //MessageCount: batchMessageCounts[batches[0].SequenceNumber], + ParentChainBlock: batches[0].ParentChainBlockNumber, + } + } + if batches[0].SequenceNumber < sequenceNumberToKeep { + batches = batches[1:] + } else { + break + } + } if len(batches) == 0 { return nil } @@ -604,9 +646,8 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L pos := batches[0].SequenceNumber startPos := pos - var nextAcc common.Hash - var prevbatchmeta BatchMetadata - if pos > 0 { + + if pos > sequenceNumberToKeep { var err error prevbatchmeta, err = t.GetBatchMetadata(pos - 1) nextAcc = prevbatchmeta.Accumulator @@ -631,7 +672,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L return errors.New("previous batch accumulator mismatch") } - if batch.AfterDelayedCount > 0 { + if batch.AfterDelayedCount > 0 && t.firstBatchToKeep == 0 { haveDelayedAcc, err := t.GetDelayedAcc(batch.AfterDelayedCount - 1) if errors.Is(err, AccumulatorNotFoundErr) { // We somehow missed a referenced delayed message; go back and look for it diff --git a/arbnode/node.go b/arbnode/node.go index 7a7a99ba88..f687942d57 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -529,7 +529,7 @@ func createNodeImpl( return nil, errors.New("a data availability service is required for this chain, but it was not configured") } - inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader) + inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader, config.InboxReader.FirstBatchToKeep) if err != nil { return nil, err } diff --git a/cmd/pruning/pruning.go b/cmd/pruning/pruning.go index c483526aa1..c65df7d441 100644 --- a/cmd/pruning/pruning.go +++ b/cmd/pruning/pruning.go @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node return nil, fmt.Errorf("failed to get finalized block: %w", err) } l1BlockNum := l1Block.NumberU64() - tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil) + tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil, 0) if err != nil { return nil, err } diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 7f9f4844fd..bff4ca430f 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -336,7 +336,7 @@ func BridgeBalance( break } TransferBalance(t, "Faucet", "User", big.NewInt(1), l1info, l1client, ctx) - if i > 20 { + if i > 50 { Fatal(t, "bridging failed") } <-time.After(time.Millisecond * 100) diff --git a/system_tests/sync_test.go b/system_tests/sync_test.go new file mode 100644 index 0000000000..97c5118598 --- /dev/null +++ b/system_tests/sync_test.go @@ -0,0 +1,75 @@ +// Copyright 2021-2022, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package arbtest + +import ( + "context" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/params" + "github.com/offchainlabs/nitro/arbos/l2pricing" + "github.com/offchainlabs/nitro/util" + "math/big" + "os" + "testing" + "time" +) + +func TestSync(t *testing.T) { + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + var transferGas = util.NormalizeL2GasForL1GasInitial(800_000, params.GWei) // include room for aggregator L1 costs + + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + builder.L2Info = NewBlockChainTestInfo( + t, + types.NewArbitrumSigner(types.NewLondonSigner(builder.chainConfig.ChainID)), big.NewInt(l2pricing.InitialBaseFeeWei*2), + transferGas, + ) + cleanup := builder.Build(t) + defer cleanup() + + builder.BridgeBalance(t, "Faucet", big.NewInt(1).Mul(big.NewInt(params.Ether), big.NewInt(10000))) + + builder.L2Info.GenerateAccount("BackgroundUser") + for { + tx := builder.L2Info.PrepareTx("Faucet", "BackgroundUser", builder.L2Info.TransferGas, big.NewInt(1), nil) + err := builder.L2.Client.SendTransaction(ctx, tx) + Require(t, err) + _, err = builder.L2.EnsureTxSucceeded(tx) + Require(t, err) + count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + Require(t, err) + if count > 10 { + break + } + } + <-time.After(time.Second * 5) + + count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + Require(t, err) + nodeConfig := builder.nodeConfig + nodeConfig.InboxReader.FirstBatchToKeep = count + + err = os.RemoveAll(builder.l2StackConfig.ResolvePath("arbitrumdata")) + Require(t, err) + + builder.L2.cleanup() + + nodeB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{stackConfig: builder.l2StackConfig, nodeConfig: nodeConfig}) + defer cleanupB() + for { + tx := builder.L2Info.PrepareTx("Faucet", "BackgroundUser", builder.L2Info.TransferGas, big.NewInt(1), nil) + err := nodeB.Client.SendTransaction(ctx, tx) + Require(t, err) + _, err = nodeB.EnsureTxSucceeded(tx) + Require(t, err) + count, err := nodeB.ConsensusNode.InboxTracker.GetBatchCount() + Require(t, err) + if count > 20 { + break + } + } + +} From 0a4b8b72e1839db5563d332e781dc644e59123dc Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Thu, 2 May 2024 18:52:42 +0530 Subject: [PATCH 02/21] fixes --- arbnode/delayed_seq_reorg_test.go | 2 +- arbnode/inbox_reader.go | 6 +- arbnode/inbox_test.go | 3 +- arbnode/inbox_tracker.go | 104 ++++++++++-------- arbnode/node.go | 33 +++++- arbnode/transaction_streamer.go | 13 ++- cmd/pruning/pruning.go | 2 +- system_tests/common_test.go | 2 +- .../{sync_test.go => snap_sync_test.go} | 23 ++-- 9 files changed, 119 insertions(+), 69 deletions(-) rename system_tests/{sync_test.go => snap_sync_test.go} (70%) diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index 8572673fba..455a503546 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) { defer cancel() exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{}) - tracker, err := NewInboxTracker(db, streamer, nil, nil, 0) + tracker, err := NewInboxTracker(db, streamer, nil, nil, DefaultSnapSyncConfig) Require(t, err) err = streamer.Start(ctx) diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index 2d3afbbb3a..3ba9aa78f3 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -32,7 +32,6 @@ type InboxReaderConfig struct { TargetMessagesRead uint64 `koanf:"target-messages-read" reload:"hot"` MaxBlocksToRead uint64 `koanf:"max-blocks-to-read" reload:"hot"` ReadMode string `koanf:"read-mode" reload:"hot"` - FirstBatchToKeep uint64 `koanf:"first-batch-to-keep" reload:"hot"` } type InboxReaderConfigFetcher func() *InboxReaderConfig @@ -57,7 +56,6 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once") f.Uint64(prefix+".max-blocks-to-read", DefaultInboxReaderConfig.MaxBlocksToRead, "if adjust-blocks-to-read is enabled, the maximum number of blocks to read at once") f.String(prefix+".read-mode", DefaultInboxReaderConfig.ReadMode, "mode to only read latest or safe or finalized L1 blocks. Enabling safe or finalized disables feed input and output. Defaults to latest. Takes string input, valid strings- latest, safe, finalized") - f.Uint64(prefix+".first-batch-to-keep", DefaultInboxReaderConfig.FirstBatchToKeep, "the first batch to keep in the database while adding messages") } var DefaultInboxReaderConfig = InboxReaderConfig{ @@ -69,7 +67,6 @@ var DefaultInboxReaderConfig = InboxReaderConfig{ TargetMessagesRead: 500, MaxBlocksToRead: 2000, ReadMode: "latest", - FirstBatchToKeep: 0, } var TestInboxReaderConfig = InboxReaderConfig{ @@ -81,7 +78,6 @@ var TestInboxReaderConfig = InboxReaderConfig{ TargetMessagesRead: 500, MaxBlocksToRead: 2000, ReadMode: "latest", - FirstBatchToKeep: 0, } type InboxReader struct { @@ -143,7 +139,7 @@ func (r *InboxReader) Start(ctxIn context.Context) error { return err } if batchCount > 0 { - if r.tracker.GetFirstBatchToKeep() != 0 { + if r.tracker.snapSyncConfig.Enabled { break } // Validate the init message matches our L2 blockchain diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index e979979dea..d3ae8a8c34 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -61,12 +61,13 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (* } transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &DefaultTransactionStreamerConfig } + snapSyncConfigFetcher := func() *SnapSyncConfig { return &DefaultSnapSyncConfig } execEngine, err := gethexec.NewExecutionEngine(bc) if err != nil { Fail(t, err) } execSeq := &execClientWrapper{execEngine, t} - inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher) + inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher, snapSyncConfigFetcher) if err != nil { Fail(t, err) } diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 8341ed0e93..e14049b365 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -33,30 +33,30 @@ var ( ) type InboxTracker struct { - db ethdb.Database - txStreamer *TransactionStreamer - mutex sync.Mutex - validator *staker.BlockValidator - das arbstate.DataAvailabilityReader - blobReader arbstate.BlobReader - firstBatchToKeep uint64 + db ethdb.Database + txStreamer *TransactionStreamer + mutex sync.Mutex + validator *staker.BlockValidator + das arbstate.DataAvailabilityReader + blobReader arbstate.BlobReader + snapSyncConfig SnapSyncConfig batchMetaMutex sync.Mutex batchMeta *containers.LruCache[uint64, BatchMetadata] } -func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader, firstBatchToKeep uint64) (*InboxTracker, error) { +func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader, snapSyncConfig SnapSyncConfig) (*InboxTracker, error) { // We support a nil txStreamer for the pruning code if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil { return nil, errors.New("data availability service required but unconfigured") } tracker := &InboxTracker{ - db: db, - txStreamer: txStreamer, - das: das, - blobReader: blobReader, - batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000), - firstBatchToKeep: firstBatchToKeep, + db: db, + txStreamer: txStreamer, + das: das, + blobReader: blobReader, + batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000), + snapSyncConfig: snapSyncConfig, } return tracker, nil } @@ -211,10 +211,6 @@ func (t *InboxTracker) GetBatchParentChainBlock(seqNum uint64) (uint64, error) { return metadata.ParentChainBlock, err } -func (t *InboxTracker) GetFirstBatchToKeep() uint64 { - return t.firstBatchToKeep -} - // GetBatchAcc is a convenience function wrapping GetBatchMetadata func (t *InboxTracker) GetBatchAcc(seqNum uint64) (common.Hash, error) { metadata, err := t.GetBatchMetadata(seqNum) @@ -390,18 +386,27 @@ func (t *InboxTracker) GetDelayedMessageBytes(seqNum uint64) ([]byte, error) { func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardReorg bool) error { var nextAcc common.Hash - for len(messages) > 0 { - pos, err := messages[0].Message.Header.SeqNum() - if err != nil { - return err - } - if pos+1 == t.firstBatchToKeep { - nextAcc = messages[0].AfterInboxAcc() + firstBatchToKeep := uint64(0) + if t.snapSyncConfig.Enabled { + firstBatchToKeep = t.snapSyncConfig.BatchCount + if firstBatchToKeep > 0 { + firstBatchToKeep-- } - if pos < t.firstBatchToKeep { - messages = messages[1:] - } else { - break + } + if t.snapSyncConfig.Enabled { + for len(messages) > 0 { + pos, err := messages[0].Message.Header.SeqNum() + if err != nil { + return err + } + if pos+1 == firstBatchToKeep { + nextAcc = messages[0].AfterInboxAcc() + } + if pos < firstBatchToKeep { + messages = messages[1:] + } else { + break + } } } if len(messages) == 0 { @@ -428,7 +433,7 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR } } - if pos > t.firstBatchToKeep { + if pos > firstBatchToKeep { var err error nextAcc, err = t.GetDelayedAcc(pos - 1) if err != nil { @@ -618,24 +623,27 @@ var delayedMessagesMismatch = errors.New("sequencer batch delayed messages missi func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L1Interface, batches []*SequencerInboxBatch) error { var nextAcc common.Hash var prevbatchmeta BatchMetadata - sequenceNumberToKeep := t.firstBatchToKeep - if sequenceNumberToKeep > 0 { - sequenceNumberToKeep-- - } - for len(batches) > 0 { - if batches[0].SequenceNumber+1 == sequenceNumberToKeep { - nextAcc = batches[0].AfterInboxAcc - prevbatchmeta = BatchMetadata{ - Accumulator: batches[0].AfterInboxAcc, - DelayedMessageCount: batches[0].AfterDelayedCount, - //MessageCount: batchMessageCounts[batches[0].SequenceNumber], - ParentChainBlock: batches[0].ParentChainBlockNumber, + sequenceNumberToKeep := uint64(0) + if t.snapSyncConfig.Enabled { + sequenceNumberToKeep = t.snapSyncConfig.BatchCount + if sequenceNumberToKeep > 0 { + sequenceNumberToKeep-- + } + for len(batches) > 0 { + if batches[0].SequenceNumber+1 == sequenceNumberToKeep { + nextAcc = batches[0].AfterInboxAcc + prevbatchmeta = BatchMetadata{ + Accumulator: batches[0].AfterInboxAcc, + DelayedMessageCount: batches[0].AfterDelayedCount, + MessageCount: arbutil.MessageIndex(t.snapSyncConfig.PrevBatchMessageCount), + ParentChainBlock: batches[0].ParentChainBlockNumber, + } + } + if batches[0].SequenceNumber < sequenceNumberToKeep { + batches = batches[1:] + } else { + break } - } - if batches[0].SequenceNumber < sequenceNumberToKeep { - batches = batches[1:] - } else { - break } } if len(batches) == 0 { @@ -672,7 +680,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L return errors.New("previous batch accumulator mismatch") } - if batch.AfterDelayedCount > 0 && t.firstBatchToKeep == 0 { + if batch.AfterDelayedCount > 0 && !t.snapSyncConfig.Enabled { haveDelayedAcc, err := t.GetDelayedAcc(batch.AfterDelayedCount - 1) if errors.Is(err, AccumulatorNotFoundErr) { // We somehow missed a referenced delayed message; go back and look for it diff --git a/arbnode/node.go b/arbnode/node.go index f687942d57..b690276d9e 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -93,6 +93,7 @@ type Config struct { TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"` Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"` ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` + SnapSync SnapSyncConfig `koanf:"snap-sync" reload:"hot"` } func (c *Config) Validate() error { @@ -156,6 +157,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed DangerousConfigAddOptions(prefix+".dangerous", f) TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f) MaintenanceConfigAddOptions(prefix+".maintenance", f) + SnapSyncConfigAddOptions(prefix+".snap-sync", f) } var ConfigDefault = Config{ @@ -175,6 +177,7 @@ var ConfigDefault = Config{ TransactionStreamer: DefaultTransactionStreamerConfig, ResourceMgmt: resourcemanager.DefaultConfig, Maintenance: DefaultMaintenanceConfig, + SnapSync: DefaultSnapSyncConfig, } func ConfigDefaultL1Test() *Config { @@ -273,6 +276,31 @@ type Node struct { ctx context.Context } +type SnapSyncConfig struct { + Enabled bool `koanf:"enabled" reload:"hot"` + PrevBatchMessageCount uint64 `koanf:"prev-batch-message-count" reload:"hot"` + PrevDelayedRead uint64 `koanf:"prev-delayed-read" reload:"hot"` + BatchCount uint64 `koanf:"batch-count" reload:"hot"` + DelayedCount uint64 `koanf:"delayed-count" reload:"hot"` +} + +var DefaultSnapSyncConfig = SnapSyncConfig{ + Enabled: false, + PrevBatchMessageCount: 0, + BatchCount: 0, + DelayedCount: 0, + PrevDelayedRead: 0, +} + +func SnapSyncConfigAddOptions(prefix string, f *flag.FlagSet) { + f.Bool(prefix+".enabled", DefaultSnapSyncConfig.Enabled, "enable snap sync") + f.Uint64(prefix+".prev-batch-message-count", DefaultSnapSyncConfig.PrevBatchMessageCount, "previous batch message count") + f.Uint64(prefix+".batch-count", DefaultSnapSyncConfig.BatchCount, "batch count") + f.Uint64(prefix+".delayed-count", DefaultSnapSyncConfig.DelayedCount, "delayed count") + f.Uint64(prefix+".prev-delayed-read", DefaultSnapSyncConfig.PrevDelayedRead, "previous delayed read") + +} + type ConfigFetcher interface { Get() *Config Start(context.Context) @@ -410,7 +438,8 @@ func createNodeImpl( } transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer } - txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher) + snapSyncConfigFetcher := func() *SnapSyncConfig { return &configFetcher.Get().SnapSync } + txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, snapSyncConfigFetcher) if err != nil { return nil, err } @@ -529,7 +558,7 @@ func createNodeImpl( return nil, errors.New("a data availability service is required for this chain, but it was not configured") } - inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader, config.InboxReader.FirstBatchToKeep) + inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader, config.SnapSync) if err != nil { return nil, err } diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 017c23c496..b4a2a637fc 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -50,9 +50,10 @@ type TransactionStreamer struct { execLastMsgCount arbutil.MessageIndex validator *staker.BlockValidator - db ethdb.Database - fatalErrChan chan<- error - config TransactionStreamerConfigFetcher + db ethdb.Database + fatalErrChan chan<- error + config TransactionStreamerConfigFetcher + snapSyncConfig SnapSyncConfigFetcher insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held reorgMutex sync.RWMutex @@ -80,6 +81,7 @@ type TransactionStreamerConfig struct { } type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig +type SnapSyncConfigFetcher func() *SnapSyncConfig var DefaultTransactionStreamerConfig = TransactionStreamerConfig{ MaxBroadcasterQueueSize: 50_000, @@ -106,6 +108,7 @@ func NewTransactionStreamer( broadcastServer *broadcaster.Broadcaster, fatalErrChan chan<- error, config TransactionStreamerConfigFetcher, + snapSyncConfig SnapSyncConfigFetcher, ) (*TransactionStreamer, error) { streamer := &TransactionStreamer{ exec: exec, @@ -115,6 +118,7 @@ func NewTransactionStreamer( broadcastServer: broadcastServer, fatalErrChan: fatalErrChan, config: config, + snapSyncConfig: snapSyncConfig, cachedL1PriceData: &L1PriceData{ msgToL1PriceData: []L1PriceDataOfMsg{}, }, @@ -721,6 +725,9 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m } func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) (uint64, error) { + if s.snapSyncConfig().Enabled && uint64(pos) == s.snapSyncConfig().PrevBatchMessageCount { + return s.snapSyncConfig().PrevDelayedRead, nil + } var prevDelayedRead uint64 if pos > 0 { prevMsg, err := s.GetMessage(pos - 1) diff --git a/cmd/pruning/pruning.go b/cmd/pruning/pruning.go index c65df7d441..926f9ffc3a 100644 --- a/cmd/pruning/pruning.go +++ b/cmd/pruning/pruning.go @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node return nil, fmt.Errorf("failed to get finalized block: %w", err) } l1BlockNum := l1Block.NumberU64() - tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil, 0) + tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil, arbnode.DefaultSnapSyncConfig) if err != nil { return nil, err } diff --git a/system_tests/common_test.go b/system_tests/common_test.go index bff4ca430f..3c0ee84526 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -336,7 +336,7 @@ func BridgeBalance( break } TransferBalance(t, "Faucet", "User", big.NewInt(1), l1info, l1client, ctx) - if i > 50 { + if i > 2p { Fatal(t, "bridging failed") } <-time.After(time.Millisecond * 100) diff --git a/system_tests/sync_test.go b/system_tests/snap_sync_test.go similarity index 70% rename from system_tests/sync_test.go rename to system_tests/snap_sync_test.go index 97c5118598..e89cd93963 100644 --- a/system_tests/sync_test.go +++ b/system_tests/snap_sync_test.go @@ -15,7 +15,7 @@ import ( "time" ) -func TestSync(t *testing.T) { +func TestSnapSync(t *testing.T) { ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -27,8 +27,7 @@ func TestSync(t *testing.T) { types.NewArbitrumSigner(types.NewLondonSigner(builder.chainConfig.ChainID)), big.NewInt(l2pricing.InitialBaseFeeWei*2), transferGas, ) - cleanup := builder.Build(t) - defer cleanup() + builder.Build(t) builder.BridgeBalance(t, "Faucet", big.NewInt(1).Mul(big.NewInt(params.Ether), big.NewInt(10000))) @@ -47,16 +46,26 @@ func TestSync(t *testing.T) { } <-time.After(time.Second * 5) - count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + batchCount, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + Require(t, err) + delayedCount, err := builder.L2.ConsensusNode.InboxTracker.GetDelayedCount() + Require(t, err) + // Last batch is batchCount - 1, so prev batch is batchCount - 2 + prevBatchMetaData, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(batchCount - 2) + Require(t, err) + prevMessage, err := builder.L2.ConsensusNode.TxStreamer.GetMessage(prevBatchMetaData.MessageCount - 1) Require(t, err) nodeConfig := builder.nodeConfig - nodeConfig.InboxReader.FirstBatchToKeep = count - + nodeConfig.SnapSync.Enabled = true + nodeConfig.SnapSync.BatchCount = batchCount + nodeConfig.SnapSync.DelayedCount = delayedCount + nodeConfig.SnapSync.PrevDelayedRead = prevMessage.DelayedMessagesRead + nodeConfig.SnapSync.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount) err = os.RemoveAll(builder.l2StackConfig.ResolvePath("arbitrumdata")) Require(t, err) builder.L2.cleanup() - + defer builder.L1.cleanup() nodeB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{stackConfig: builder.l2StackConfig, nodeConfig: nodeConfig}) defer cleanupB() for { From b718627e04ab97c9ecde19ff17d2102666b1bf1f Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Thu, 2 May 2024 19:10:47 +0530 Subject: [PATCH 03/21] fix typo --- system_tests/common_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system_tests/common_test.go b/system_tests/common_test.go index ba3de97289..8d783c4564 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -348,7 +348,7 @@ func BridgeBalance( break } TransferBalance(t, "Faucet", "User", big.NewInt(1), l1info, l1client, ctx) - if i > 2p { + if i > 20 { Fatal(t, "bridging failed") } <-time.After(time.Millisecond * 100) From 778eb25237b6152c05707dc6349258715dfa5acb Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Thu, 2 May 2024 19:25:24 +0530 Subject: [PATCH 04/21] cleanup --- arbnode/inbox_tracker.go | 2 -- system_tests/snap_sync_test.go | 8 +++++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index e14049b365..1a29ac6623 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -392,8 +392,6 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR if firstBatchToKeep > 0 { firstBatchToKeep-- } - } - if t.snapSyncConfig.Enabled { for len(messages) > 0 { pos, err := messages[0].Message.Header.SeqNum() if err != nil { diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index e89cd93963..fc9ffbda85 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -32,6 +32,7 @@ func TestSnapSync(t *testing.T) { builder.BridgeBalance(t, "Faucet", big.NewInt(1).Mul(big.NewInt(params.Ether), big.NewInt(10000))) builder.L2Info.GenerateAccount("BackgroundUser") + // Sync node till batch count is 10 for { tx := builder.L2Info.PrepareTx("Faucet", "BackgroundUser", builder.L2Info.TransferGas, big.NewInt(1), nil) err := builder.L2.Client.SendTransaction(ctx, tx) @@ -55,19 +56,25 @@ func TestSnapSync(t *testing.T) { Require(t, err) prevMessage, err := builder.L2.ConsensusNode.TxStreamer.GetMessage(prevBatchMetaData.MessageCount - 1) Require(t, err) + // Create a config with snap sync enabled and same database directory as the first node nodeConfig := builder.nodeConfig nodeConfig.SnapSync.Enabled = true nodeConfig.SnapSync.BatchCount = batchCount nodeConfig.SnapSync.DelayedCount = delayedCount nodeConfig.SnapSync.PrevDelayedRead = prevMessage.DelayedMessagesRead nodeConfig.SnapSync.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount) + // Cleanup the message data, but keep the block state data. + // This is to simulate a snap sync environment where we’ve just gotten the block state but don’t have any messages. err = os.RemoveAll(builder.l2StackConfig.ResolvePath("arbitrumdata")) Require(t, err) + // Cleanup the previous node to release the database lock builder.L2.cleanup() defer builder.L1.cleanup() + // New node with snap sync enabled, and the same database directory as the first node but with no message data. nodeB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{stackConfig: builder.l2StackConfig, nodeConfig: nodeConfig}) defer cleanupB() + // Sync node till batch count is 20 for { tx := builder.L2Info.PrepareTx("Faucet", "BackgroundUser", builder.L2Info.TransferGas, big.NewInt(1), nil) err := nodeB.Client.SendTransaction(ctx, tx) @@ -80,5 +87,4 @@ func TestSnapSync(t *testing.T) { break } } - } From a6d0c92c06c26ec79238f3f88fe858e54d3429f0 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Thu, 2 May 2024 19:36:45 +0530 Subject: [PATCH 05/21] fix test --- system_tests/snap_sync_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index fc9ffbda85..78967270d6 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -29,6 +29,8 @@ func TestSnapSync(t *testing.T) { ) builder.Build(t) + // Added a delay, since BridgeBalance times out if the node is just created and not synced. + <-time.After(time.Second * 1) builder.BridgeBalance(t, "Faucet", big.NewInt(1).Mul(big.NewInt(params.Ether), big.NewInt(10000))) builder.L2Info.GenerateAccount("BackgroundUser") From ac9f1710e330385c5cb67dd7726db0f24ac33aeb Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Mon, 6 May 2024 20:50:23 +0530 Subject: [PATCH 06/21] fix --- arbnode/inbox_tracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 1a29ac6623..3171ecd45f 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -388,7 +388,7 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR var nextAcc common.Hash firstBatchToKeep := uint64(0) if t.snapSyncConfig.Enabled { - firstBatchToKeep = t.snapSyncConfig.BatchCount + firstBatchToKeep = t.snapSyncConfig.DelayedCount if firstBatchToKeep > 0 { firstBatchToKeep-- } From 9259495a45100cbe84c11eca464d5609d9f6d5f3 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 7 May 2024 17:43:57 +0530 Subject: [PATCH 07/21] Changes based on PR comments --- arbnode/node.go | 12 ++++++------ system_tests/snap_sync_test.go | 3 ++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/arbnode/node.go b/arbnode/node.go index 08aa82f6b2..5ce6426013 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -93,7 +93,7 @@ type Config struct { TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"` Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"` ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` - SnapSync SnapSyncConfig `koanf:"snap-sync" reload:"hot"` + SnapSync SnapSyncConfig `koanf:"snap-sync"` } func (c *Config) Validate() error { @@ -277,11 +277,11 @@ type Node struct { } type SnapSyncConfig struct { - Enabled bool `koanf:"enabled" reload:"hot"` - PrevBatchMessageCount uint64 `koanf:"prev-batch-message-count" reload:"hot"` - PrevDelayedRead uint64 `koanf:"prev-delayed-read" reload:"hot"` - BatchCount uint64 `koanf:"batch-count" reload:"hot"` - DelayedCount uint64 `koanf:"delayed-count" reload:"hot"` + Enabled bool `koanf:"enabled"` + PrevBatchMessageCount uint64 `koanf:"prev-batch-message-count"` + PrevDelayedRead uint64 `koanf:"prev-delayed-read"` + BatchCount uint64 `koanf:"batch-count"` + DelayedCount uint64 `koanf:"delayed-count"` } var DefaultSnapSyncConfig = SnapSyncConfig{ diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index 78967270d6..746ae62f5c 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -47,7 +47,8 @@ func TestSnapSync(t *testing.T) { break } } - <-time.After(time.Second * 5) + // Wait for the last batch to be processed + <-time.After(10 * time.Millisecond) batchCount, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() Require(t, err) From b71bcd417f96ee3f5df9b7333fad36ba3069ab28 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 7 May 2024 18:20:06 +0530 Subject: [PATCH 08/21] Changes based on PR comments --- system_tests/snap_sync_test.go | 76 +++++++++++++++++++++++++--------- 1 file changed, 57 insertions(+), 19 deletions(-) diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index 746ae62f5c..3b052fda36 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -21,20 +21,25 @@ func TestSnapSync(t *testing.T) { var transferGas = util.NormalizeL2GasForL1GasInitial(800_000, params.GWei) // include room for aggregator L1 costs + // 1st node with sequencer, stays up all the time. builder := NewNodeBuilder(ctx).DefaultConfig(t, true) builder.L2Info = NewBlockChainTestInfo( t, types.NewArbitrumSigner(types.NewLondonSigner(builder.chainConfig.ChainID)), big.NewInt(l2pricing.InitialBaseFeeWei*2), transferGas, ) - builder.Build(t) + cleanup := builder.Build(t) + defer cleanup() + + // 2nd node without sequencer, syncs up to the first node. + // This node will be stopped in middle and arbitrumdata will be deleted. + nodeB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{}) - // Added a delay, since BridgeBalance times out if the node is just created and not synced. - <-time.After(time.Second * 1) builder.BridgeBalance(t, "Faucet", big.NewInt(1).Mul(big.NewInt(params.Ether), big.NewInt(10000))) builder.L2Info.GenerateAccount("BackgroundUser") - // Sync node till batch count is 10 + + // Create transactions till batch count is 10 for { tx := builder.L2Info.PrepareTx("Faucet", "BackgroundUser", builder.L2Info.TransferGas, big.NewInt(1), nil) err := builder.L2.Client.SendTransaction(ctx, tx) @@ -46,9 +51,20 @@ func TestSnapSync(t *testing.T) { if count > 10 { break } + + } + // Wait for nodeB to sync up to the first node + for { + count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + Require(t, err) + countNodeB, err := nodeB.ConsensusNode.InboxTracker.GetBatchCount() + Require(t, err) + if count == countNodeB { + break + } else { + <-time.After(10 * time.Millisecond) + } } - // Wait for the last batch to be processed - <-time.After(10 * time.Millisecond) batchCount, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() Require(t, err) @@ -59,35 +75,57 @@ func TestSnapSync(t *testing.T) { Require(t, err) prevMessage, err := builder.L2.ConsensusNode.TxStreamer.GetMessage(prevBatchMetaData.MessageCount - 1) Require(t, err) - // Create a config with snap sync enabled and same database directory as the first node + // Create a config with snap sync enabled and same database directory as the 2nd node nodeConfig := builder.nodeConfig nodeConfig.SnapSync.Enabled = true nodeConfig.SnapSync.BatchCount = batchCount nodeConfig.SnapSync.DelayedCount = delayedCount nodeConfig.SnapSync.PrevDelayedRead = prevMessage.DelayedMessagesRead nodeConfig.SnapSync.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount) - // Cleanup the message data, but keep the block state data. + // Cleanup the message data of 2nd node, but keep the block state data. // This is to simulate a snap sync environment where we’ve just gotten the block state but don’t have any messages. - err = os.RemoveAll(builder.l2StackConfig.ResolvePath("arbitrumdata")) + err = os.RemoveAll(nodeB.ConsensusNode.Stack.ResolvePath("arbitrumdata")) Require(t, err) - // Cleanup the previous node to release the database lock - builder.L2.cleanup() - defer builder.L1.cleanup() - // New node with snap sync enabled, and the same database directory as the first node but with no message data. - nodeB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{stackConfig: builder.l2StackConfig, nodeConfig: nodeConfig}) - defer cleanupB() - // Sync node till batch count is 20 + // Cleanup the 2nd node to release the database lock + cleanupB() + // New node with snap sync enabled, and the same database directory as the 2nd node but with no message data. + nodeC, cleanupC := builder.Build2ndNode(t, &SecondNodeParams{stackConfig: nodeB.ConsensusNode.Stack.Config(), nodeConfig: nodeConfig}) + defer cleanupC() + + // Create transactions till batch count is 20 for { tx := builder.L2Info.PrepareTx("Faucet", "BackgroundUser", builder.L2Info.TransferGas, big.NewInt(1), nil) - err := nodeB.Client.SendTransaction(ctx, tx) + err := builder.L2.Client.SendTransaction(ctx, tx) Require(t, err) - _, err = nodeB.EnsureTxSucceeded(tx) + _, err = builder.L2.EnsureTxSucceeded(tx) Require(t, err) - count, err := nodeB.ConsensusNode.InboxTracker.GetBatchCount() + count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() Require(t, err) if count > 20 { break } } + // Wait for nodeB to sync up to the first node + for { + count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + Require(t, err) + countNodeC, err := nodeC.ConsensusNode.InboxTracker.GetBatchCount() + Require(t, err) + if count == countNodeC { + // Once the node is synced up, check if the batch metadata is the same for the last batch + // This is to ensure that the snap sync worked correctly + metadata, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(count - 1) + Require(t, err) + metadataNodeC, err := nodeC.ConsensusNode.InboxTracker.GetBatchMetadata(countNodeC - 1) + Require(t, err) + if metadata != metadataNodeC { + t.Error("Batch metadata mismatch") + } + break + } else { + <-time.After(10 * time.Millisecond) + } + } + } From c57bbae650dce5545ea27064bd6f123fe36ce30b Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Thu, 9 May 2024 20:06:11 +0530 Subject: [PATCH 09/21] Changes based on PR comments --- arbnode/node.go | 29 ++++++++++------------------- system_tests/snap_sync_test.go | 31 +++++++++++++++++++++++-------- 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/arbnode/node.go b/arbnode/node.go index 65a3bb2d7d..c346a38e14 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -93,7 +93,8 @@ type Config struct { TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"` Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"` ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` - SnapSync SnapSyncConfig `koanf:"snap-sync"` + // SnapSyncConfig is only used for testing purposes, these should not be configured in production. + SnapSyncTest SnapSyncConfig } func (c *Config) Validate() error { @@ -157,7 +158,6 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed DangerousConfigAddOptions(prefix+".dangerous", f) TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f) MaintenanceConfigAddOptions(prefix+".maintenance", f) - SnapSyncConfigAddOptions(prefix+".snap-sync", f) } var ConfigDefault = Config{ @@ -177,7 +177,7 @@ var ConfigDefault = Config{ TransactionStreamer: DefaultTransactionStreamerConfig, ResourceMgmt: resourcemanager.DefaultConfig, Maintenance: DefaultMaintenanceConfig, - SnapSync: DefaultSnapSyncConfig, + SnapSyncTest: DefaultSnapSyncConfig, } func ConfigDefaultL1Test() *Config { @@ -277,11 +277,11 @@ type Node struct { } type SnapSyncConfig struct { - Enabled bool `koanf:"enabled"` - PrevBatchMessageCount uint64 `koanf:"prev-batch-message-count"` - PrevDelayedRead uint64 `koanf:"prev-delayed-read"` - BatchCount uint64 `koanf:"batch-count"` - DelayedCount uint64 `koanf:"delayed-count"` + Enabled bool + PrevBatchMessageCount uint64 + PrevDelayedRead uint64 + BatchCount uint64 + DelayedCount uint64 } var DefaultSnapSyncConfig = SnapSyncConfig{ @@ -292,15 +292,6 @@ var DefaultSnapSyncConfig = SnapSyncConfig{ PrevDelayedRead: 0, } -func SnapSyncConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Bool(prefix+".enabled", DefaultSnapSyncConfig.Enabled, "enable snap sync") - f.Uint64(prefix+".prev-batch-message-count", DefaultSnapSyncConfig.PrevBatchMessageCount, "previous batch message count") - f.Uint64(prefix+".batch-count", DefaultSnapSyncConfig.BatchCount, "batch count") - f.Uint64(prefix+".delayed-count", DefaultSnapSyncConfig.DelayedCount, "delayed count") - f.Uint64(prefix+".prev-delayed-read", DefaultSnapSyncConfig.PrevDelayedRead, "previous delayed read") - -} - type ConfigFetcher interface { Get() *Config Start(context.Context) @@ -438,7 +429,7 @@ func createNodeImpl( } transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer } - snapSyncConfigFetcher := func() *SnapSyncConfig { return &configFetcher.Get().SnapSync } + snapSyncConfigFetcher := func() *SnapSyncConfig { return &configFetcher.Get().SnapSyncTest } txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, snapSyncConfigFetcher) if err != nil { return nil, err @@ -569,7 +560,7 @@ func createNodeImpl( if blobReader != nil { dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(blobReader)) } - inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders, config.SnapSync) + inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders, config.SnapSyncTest) if err != nil { return nil, err } diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index 3b052fda36..87bf09f6d7 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -55,11 +55,11 @@ func TestSnapSync(t *testing.T) { } // Wait for nodeB to sync up to the first node for { - count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + header, err := builder.L2.Client.HeaderByNumber(ctx, nil) Require(t, err) - countNodeB, err := nodeB.ConsensusNode.InboxTracker.GetBatchCount() + headerNodeB, err := nodeB.Client.HeaderByNumber(ctx, nil) Require(t, err) - if count == countNodeB { + if header.Number.Cmp(headerNodeB.Number) == 0 { break } else { <-time.After(10 * time.Millisecond) @@ -77,11 +77,11 @@ func TestSnapSync(t *testing.T) { Require(t, err) // Create a config with snap sync enabled and same database directory as the 2nd node nodeConfig := builder.nodeConfig - nodeConfig.SnapSync.Enabled = true - nodeConfig.SnapSync.BatchCount = batchCount - nodeConfig.SnapSync.DelayedCount = delayedCount - nodeConfig.SnapSync.PrevDelayedRead = prevMessage.DelayedMessagesRead - nodeConfig.SnapSync.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount) + nodeConfig.SnapSyncTest.Enabled = true + nodeConfig.SnapSyncTest.BatchCount = batchCount + nodeConfig.SnapSyncTest.DelayedCount = delayedCount + nodeConfig.SnapSyncTest.PrevDelayedRead = prevMessage.DelayedMessagesRead + nodeConfig.SnapSyncTest.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount) // Cleanup the message data of 2nd node, but keep the block state data. // This is to simulate a snap sync environment where we’ve just gotten the block state but don’t have any messages. err = os.RemoveAll(nodeB.ConsensusNode.Stack.ResolvePath("arbitrumdata")) @@ -126,6 +126,21 @@ func TestSnapSync(t *testing.T) { } else { <-time.After(10 * time.Millisecond) } + + header, err := builder.L2.Client.HeaderByNumber(ctx, nil) + Require(t, err) + headerNodeB, err := nodeB.Client.HeaderByNumber(ctx, nil) + Require(t, err) + if header.Number.Cmp(headerNodeB.Number) == 0 { + // Once the node is synced up, check if the block hash is the same for the last block + // This is to ensure that the snap sync worked correctly + if header.Hash().Cmp(headerNodeB.Hash()) != 0 { + t.Error("Block hash mismatch") + } + break + } else { + <-time.After(10 * time.Millisecond) + } } } From a4d4aeb8a56645e4776cf0af42c8cd382fa04349 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 14 May 2024 16:48:16 +0530 Subject: [PATCH 10/21] Changes based on PR comments --- arbnode/inbox_test.go | 3 +-- arbnode/node.go | 3 +-- arbnode/transaction_streamer.go | 9 ++++----- system_tests/snap_sync_test.go | 2 +- 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index 2e48b367c3..259a25f439 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -61,13 +61,12 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (* } transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &DefaultTransactionStreamerConfig } - snapSyncConfigFetcher := func() *SnapSyncConfig { return &DefaultSnapSyncConfig } execEngine, err := gethexec.NewExecutionEngine(bc) if err != nil { Fail(t, err) } execSeq := &execClientWrapper{execEngine, t} - inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher, snapSyncConfigFetcher) + inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher, &DefaultSnapSyncConfig) if err != nil { Fail(t, err) } diff --git a/arbnode/node.go b/arbnode/node.go index c346a38e14..fd07a87de1 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -429,8 +429,7 @@ func createNodeImpl( } transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer } - snapSyncConfigFetcher := func() *SnapSyncConfig { return &configFetcher.Get().SnapSyncTest } - txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, snapSyncConfigFetcher) + txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest) if err != nil { return nil, err } diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 07b467cd3e..85ccc7f642 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -53,7 +53,7 @@ type TransactionStreamer struct { db ethdb.Database fatalErrChan chan<- error config TransactionStreamerConfigFetcher - snapSyncConfig SnapSyncConfigFetcher + snapSyncConfig *SnapSyncConfig insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held reorgMutex sync.RWMutex @@ -81,7 +81,6 @@ type TransactionStreamerConfig struct { } type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig -type SnapSyncConfigFetcher func() *SnapSyncConfig var DefaultTransactionStreamerConfig = TransactionStreamerConfig{ MaxBroadcasterQueueSize: 50_000, @@ -108,7 +107,7 @@ func NewTransactionStreamer( broadcastServer *broadcaster.Broadcaster, fatalErrChan chan<- error, config TransactionStreamerConfigFetcher, - snapSyncConfig SnapSyncConfigFetcher, + snapSyncConfig *SnapSyncConfig, ) (*TransactionStreamer, error) { streamer := &TransactionStreamer{ exec: exec, @@ -738,8 +737,8 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m } func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) (uint64, error) { - if s.snapSyncConfig().Enabled && uint64(pos) == s.snapSyncConfig().PrevBatchMessageCount { - return s.snapSyncConfig().PrevDelayedRead, nil + if s.snapSyncConfig.Enabled && uint64(pos) == s.snapSyncConfig.PrevBatchMessageCount { + return s.snapSyncConfig.PrevDelayedRead, nil } var prevDelayedRead uint64 if pos > 0 { diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index 87bf09f6d7..87b66958e0 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -122,9 +122,9 @@ func TestSnapSync(t *testing.T) { if metadata != metadataNodeC { t.Error("Batch metadata mismatch") } - break } else { <-time.After(10 * time.Millisecond) + continue } header, err := builder.L2.Client.HeaderByNumber(ctx, nil) From 95c79c24a331f77f3d26c2eb13401a59f4f61028 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 14 May 2024 17:43:55 +0530 Subject: [PATCH 11/21] minor fix --- system_tests/snap_sync_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index 87b66958e0..9497ad01b3 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -122,19 +122,20 @@ func TestSnapSync(t *testing.T) { if metadata != metadataNodeC { t.Error("Batch metadata mismatch") } + break } else { <-time.After(10 * time.Millisecond) - continue } - + } + for { header, err := builder.L2.Client.HeaderByNumber(ctx, nil) Require(t, err) - headerNodeB, err := nodeB.Client.HeaderByNumber(ctx, nil) + headerNodeC, err := nodeC.Client.HeaderByNumber(ctx, nil) Require(t, err) - if header.Number.Cmp(headerNodeB.Number) == 0 { + if header.Number.Cmp(headerNodeC.Number) == 0 { // Once the node is synced up, check if the block hash is the same for the last block // This is to ensure that the snap sync worked correctly - if header.Hash().Cmp(headerNodeB.Hash()) != 0 { + if header.Hash().Cmp(headerNodeC.Hash()) != 0 { t.Error("Block hash mismatch") } break From 3dbe2cdcd52c6c47053a46bd992be0d57bf7a6ba Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 14 May 2024 17:44:51 +0530 Subject: [PATCH 12/21] Changes based on PR comments --- arbnode/inbox_tracker.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 335d94b3cd..bcdf95bb6c 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -389,13 +389,20 @@ func (t *InboxTracker) GetDelayedMessageBytes(seqNum uint64) ([]byte, error) { func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardReorg bool) error { var nextAcc common.Hash firstBatchToKeep := uint64(0) - if t.snapSyncConfig.Enabled { + if len(messages) == 0 { + return nil + } + pos, err := messages[0].Message.Header.SeqNum() + if err != nil { + return err + } + if t.snapSyncConfig.Enabled && pos < t.snapSyncConfig.DelayedCount { firstBatchToKeep = t.snapSyncConfig.DelayedCount if firstBatchToKeep > 0 { firstBatchToKeep-- } for len(messages) > 0 { - pos, err := messages[0].Message.Header.SeqNum() + pos, err = messages[0].Message.Header.SeqNum() if err != nil { return err } @@ -409,13 +416,10 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR } } } - if len(messages) == 0 { - return nil - } t.mutex.Lock() defer t.mutex.Unlock() - pos, err := messages[0].Message.Header.SeqNum() + pos, err = messages[0].Message.Header.SeqNum() if err != nil { return err } @@ -624,7 +628,10 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L var nextAcc common.Hash var prevbatchmeta BatchMetadata sequenceNumberToKeep := uint64(0) - if t.snapSyncConfig.Enabled { + if len(batches) == 0 { + return nil + } + if t.snapSyncConfig.Enabled && batches[0].SequenceNumber < t.snapSyncConfig.BatchCount { sequenceNumberToKeep = t.snapSyncConfig.BatchCount if sequenceNumberToKeep > 0 { sequenceNumberToKeep-- @@ -646,9 +653,6 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L } } } - if len(batches) == 0 { - return nil - } t.mutex.Lock() defer t.mutex.Unlock() From 46ddd690975d37fcbff0bd156876cd7c18add284 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 14 May 2024 21:17:28 +0530 Subject: [PATCH 13/21] Changes based on PR comments --- arbnode/inbox_tracker.go | 2 +- system_tests/snap_sync_test.go | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index bcdf95bb6c..1aa0ff7687 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -684,7 +684,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L return errors.New("previous batch accumulator mismatch") } - if batch.AfterDelayedCount > 0 && !t.snapSyncConfig.Enabled { + if batch.AfterDelayedCount > 0 { haveDelayedAcc, err := t.GetDelayedAcc(batch.AfterDelayedCount - 1) if errors.Is(err, AccumulatorNotFoundErr) { // We somehow missed a referenced delayed message; go back and look for it diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index 9497ad01b3..a5f84b1982 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -68,8 +68,6 @@ func TestSnapSync(t *testing.T) { batchCount, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() Require(t, err) - delayedCount, err := builder.L2.ConsensusNode.InboxTracker.GetDelayedCount() - Require(t, err) // Last batch is batchCount - 1, so prev batch is batchCount - 2 prevBatchMetaData, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(batchCount - 2) Require(t, err) @@ -79,7 +77,7 @@ func TestSnapSync(t *testing.T) { nodeConfig := builder.nodeConfig nodeConfig.SnapSyncTest.Enabled = true nodeConfig.SnapSyncTest.BatchCount = batchCount - nodeConfig.SnapSyncTest.DelayedCount = delayedCount + nodeConfig.SnapSyncTest.DelayedCount = prevBatchMetaData.DelayedMessageCount - 1 nodeConfig.SnapSyncTest.PrevDelayedRead = prevMessage.DelayedMessagesRead nodeConfig.SnapSyncTest.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount) // Cleanup the message data of 2nd node, but keep the block state data. From 21208fb5202029579698d0ca754c4291f1640db8 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Wed, 15 May 2024 09:53:33 +0530 Subject: [PATCH 14/21] fix test --- system_tests/snap_sync_test.go | 46 +++++++++++++++------------------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index a5f84b1982..2dcd22564f 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -110,36 +110,30 @@ func TestSnapSync(t *testing.T) { Require(t, err) countNodeC, err := nodeC.ConsensusNode.InboxTracker.GetBatchCount() Require(t, err) - if count == countNodeC { - // Once the node is synced up, check if the batch metadata is the same for the last batch - // This is to ensure that the snap sync worked correctly - metadata, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(count - 1) - Require(t, err) - metadataNodeC, err := nodeC.ConsensusNode.InboxTracker.GetBatchMetadata(countNodeC - 1) - Require(t, err) - if metadata != metadataNodeC { - t.Error("Batch metadata mismatch") - } - break - } else { + if count != countNodeC { <-time.After(10 * time.Millisecond) + continue } - } - for { - header, err := builder.L2.Client.HeaderByNumber(ctx, nil) + // Once the node is synced up, check if the batch metadata is the same for the last batch + // This is to ensure that the snap sync worked correctly + metadata, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(count - 1) Require(t, err) - headerNodeC, err := nodeC.Client.HeaderByNumber(ctx, nil) + metadataNodeC, err := nodeC.ConsensusNode.InboxTracker.GetBatchMetadata(countNodeC - 1) Require(t, err) - if header.Number.Cmp(headerNodeC.Number) == 0 { - // Once the node is synced up, check if the block hash is the same for the last block - // This is to ensure that the snap sync worked correctly - if header.Hash().Cmp(headerNodeC.Hash()) != 0 { - t.Error("Block hash mismatch") - } - break - } else { - <-time.After(10 * time.Millisecond) + if metadata != metadataNodeC { + t.Error("Batch metadata mismatch") + } + // Fetching message count - 1 instead on the latest block number as the latest block number might not be + // present in the snap sync node since it does not the sequencer feed. + header, err := builder.L2.Client.HeaderByNumber(ctx, big.NewInt(int64(metadata.MessageCount)-1)) + Require(t, err) + headerNodeC, err := nodeC.Client.HeaderByNumber(ctx, big.NewInt(int64(metadata.MessageCount)-1)) + Require(t, err) + // Once the node is synced up, check if the block hash is the same for the last block + // This is to ensure that the snap sync worked correctly + if header.Hash().Cmp(headerNodeC.Hash()) != 0 { + t.Error("Block hash mismatch") } + break } - } From ddde138a6346871cbbc1165f97027946d37b76f9 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Wed, 15 May 2024 10:27:37 +0530 Subject: [PATCH 15/21] fix race condition --- system_tests/snap_sync_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index 2dcd22564f..fc7ebc1d33 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -123,6 +123,24 @@ func TestSnapSync(t *testing.T) { if metadata != metadataNodeC { t.Error("Batch metadata mismatch") } + for { + latestHeader, err := builder.L2.Client.HeaderByNumber(ctx, nil) + Require(t, err) + if latestHeader.Number.Uint64() < uint64(metadata.MessageCount)-1 { + <-time.After(10 * time.Millisecond) + } else { + break + } + } + for { + latestHeaderNodeC, err := nodeC.Client.HeaderByNumber(ctx, nil) + Require(t, err) + if latestHeaderNodeC.Number.Uint64() < uint64(metadata.MessageCount)-1 { + <-time.After(10 * time.Millisecond) + } else { + break + } + } // Fetching message count - 1 instead on the latest block number as the latest block number might not be // present in the snap sync node since it does not the sequencer feed. header, err := builder.L2.Client.HeaderByNumber(ctx, big.NewInt(int64(metadata.MessageCount)-1)) From 045018c03b526188f79428c64df65af6d5f4d538 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Wed, 15 May 2024 10:30:00 +0530 Subject: [PATCH 16/21] refractor --- system_tests/snap_sync_test.go | 56 ++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index fc7ebc1d33..e531b62f8a 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -105,6 +105,7 @@ func TestSnapSync(t *testing.T) { } } // Wait for nodeB to sync up to the first node + finalMessageCount := uint64(0) for { count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() Require(t, err) @@ -123,35 +124,36 @@ func TestSnapSync(t *testing.T) { if metadata != metadataNodeC { t.Error("Batch metadata mismatch") } - for { - latestHeader, err := builder.L2.Client.HeaderByNumber(ctx, nil) - Require(t, err) - if latestHeader.Number.Uint64() < uint64(metadata.MessageCount)-1 { - <-time.After(10 * time.Millisecond) - } else { - break - } - } - for { - latestHeaderNodeC, err := nodeC.Client.HeaderByNumber(ctx, nil) - Require(t, err) - if latestHeaderNodeC.Number.Uint64() < uint64(metadata.MessageCount)-1 { - <-time.After(10 * time.Millisecond) - } else { - break - } - } - // Fetching message count - 1 instead on the latest block number as the latest block number might not be - // present in the snap sync node since it does not the sequencer feed. - header, err := builder.L2.Client.HeaderByNumber(ctx, big.NewInt(int64(metadata.MessageCount)-1)) + finalMessageCount = uint64(metadata.MessageCount) + break + } + for { + latestHeader, err := builder.L2.Client.HeaderByNumber(ctx, nil) Require(t, err) - headerNodeC, err := nodeC.Client.HeaderByNumber(ctx, big.NewInt(int64(metadata.MessageCount)-1)) + if latestHeader.Number.Uint64() < uint64(finalMessageCount)-1 { + <-time.After(10 * time.Millisecond) + } else { + break + } + } + for { + latestHeaderNodeC, err := nodeC.Client.HeaderByNumber(ctx, nil) Require(t, err) - // Once the node is synced up, check if the block hash is the same for the last block - // This is to ensure that the snap sync worked correctly - if header.Hash().Cmp(headerNodeC.Hash()) != 0 { - t.Error("Block hash mismatch") + if latestHeaderNodeC.Number.Uint64() < uint64(finalMessageCount)-1 { + <-time.After(10 * time.Millisecond) + } else { + break } - break + } + // Fetching message count - 1 instead on the latest block number as the latest block number might not be + // present in the snap sync node since it does not the sequencer feed. + header, err := builder.L2.Client.HeaderByNumber(ctx, big.NewInt(int64(finalMessageCount)-1)) + Require(t, err) + headerNodeC, err := nodeC.Client.HeaderByNumber(ctx, big.NewInt(int64(finalMessageCount)-1)) + Require(t, err) + // Once the node is synced up, check if the block hash is the same for the last block + // This is to ensure that the snap sync worked correctly + if header.Hash().Cmp(headerNodeC.Hash()) != 0 { + t.Error("Block hash mismatch") } } From 1c4f76eb52bcca39975a05939d26ae4a2e44fcd5 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Wed, 15 May 2024 10:31:41 +0530 Subject: [PATCH 17/21] refractor --- system_tests/snap_sync_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index e531b62f8a..a0a349a1d9 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -5,14 +5,16 @@ package arbtest import ( "context" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/params" - "github.com/offchainlabs/nitro/arbos/l2pricing" - "github.com/offchainlabs/nitro/util" "math/big" "os" "testing" "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/params" + + "github.com/offchainlabs/nitro/arbos/l2pricing" + "github.com/offchainlabs/nitro/util" ) func TestSnapSync(t *testing.T) { @@ -146,7 +148,7 @@ func TestSnapSync(t *testing.T) { } } // Fetching message count - 1 instead on the latest block number as the latest block number might not be - // present in the snap sync node since it does not the sequencer feed. + // present in the snap sync node since it does not have the sequencer feed. header, err := builder.L2.Client.HeaderByNumber(ctx, big.NewInt(int64(finalMessageCount)-1)) Require(t, err) headerNodeC, err := nodeC.Client.HeaderByNumber(ctx, big.NewInt(int64(finalMessageCount)-1)) From 45a23a4163161259fda63dda394a26b7092106f9 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 28 May 2024 18:35:38 +0530 Subject: [PATCH 18/21] Changes based on PR comments --- arbnode/inbox_tracker.go | 29 +++--- system_tests/snap_sync_test.go | 164 +++++++++++++++++---------------- 2 files changed, 102 insertions(+), 91 deletions(-) diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 1aa0ff7687..b950c1e1ef 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -388,7 +388,7 @@ func (t *InboxTracker) GetDelayedMessageBytes(seqNum uint64) ([]byte, error) { func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardReorg bool) error { var nextAcc common.Hash - firstBatchToKeep := uint64(0) + firstDelayedMsgToKeep := uint64(0) if len(messages) == 0 { return nil } @@ -397,19 +397,22 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR return err } if t.snapSyncConfig.Enabled && pos < t.snapSyncConfig.DelayedCount { - firstBatchToKeep = t.snapSyncConfig.DelayedCount - if firstBatchToKeep > 0 { - firstBatchToKeep-- + firstDelayedMsgToKeep = t.snapSyncConfig.DelayedCount + if firstDelayedMsgToKeep > 0 { + firstDelayedMsgToKeep-- } - for len(messages) > 0 { + for { + if len(messages) == 0 { + return nil + } pos, err = messages[0].Message.Header.SeqNum() if err != nil { return err } - if pos+1 == firstBatchToKeep { + if pos+1 == firstDelayedMsgToKeep { nextAcc = messages[0].AfterInboxAcc() } - if pos < firstBatchToKeep { + if pos < firstDelayedMsgToKeep { messages = messages[1:] } else { break @@ -419,11 +422,6 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR t.mutex.Lock() defer t.mutex.Unlock() - pos, err = messages[0].Message.Header.SeqNum() - if err != nil { - return err - } - if !hardReorg { // This math is safe to do as we know len(messages) > 0 haveLastAcc, err := t.GetDelayedAcc(pos + uint64(len(messages)) - 1) @@ -437,7 +435,7 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR } } - if pos > firstBatchToKeep { + if pos > firstDelayedMsgToKeep { var err error nextAcc, err = t.GetDelayedAcc(pos - 1) if err != nil { @@ -636,7 +634,10 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L if sequenceNumberToKeep > 0 { sequenceNumberToKeep-- } - for len(batches) > 0 { + for { + if len(batches) == 0 { + return nil + } if batches[0].SequenceNumber+1 == sequenceNumberToKeep { nextAcc = batches[0].AfterInboxAcc prevbatchmeta = BatchMetadata{ diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index a0a349a1d9..37dc964503 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -11,8 +11,10 @@ import ( "time" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/params" + "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbos/l2pricing" "github.com/offchainlabs/nitro/util" ) @@ -42,49 +44,15 @@ func TestSnapSync(t *testing.T) { builder.L2Info.GenerateAccount("BackgroundUser") // Create transactions till batch count is 10 - for { - tx := builder.L2Info.PrepareTx("Faucet", "BackgroundUser", builder.L2Info.TransferGas, big.NewInt(1), nil) - err := builder.L2.Client.SendTransaction(ctx, tx) - Require(t, err) - _, err = builder.L2.EnsureTxSucceeded(tx) - Require(t, err) - count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() - Require(t, err) - if count > 10 { - break - } - - } + createTransactionTillBatchCount(ctx, t, builder, 10) // Wait for nodeB to sync up to the first node - for { - header, err := builder.L2.Client.HeaderByNumber(ctx, nil) - Require(t, err) - headerNodeB, err := nodeB.Client.HeaderByNumber(ctx, nil) - Require(t, err) - if header.Number.Cmp(headerNodeB.Number) == 0 { - break - } else { - <-time.After(10 * time.Millisecond) - } - } + waitForBlocksToCatchup(ctx, t, builder.L2.Client, nodeB.Client) - batchCount, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() - Require(t, err) - // Last batch is batchCount - 1, so prev batch is batchCount - 2 - prevBatchMetaData, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(batchCount - 2) - Require(t, err) - prevMessage, err := builder.L2.ConsensusNode.TxStreamer.GetMessage(prevBatchMetaData.MessageCount - 1) - Require(t, err) // Create a config with snap sync enabled and same database directory as the 2nd node - nodeConfig := builder.nodeConfig - nodeConfig.SnapSyncTest.Enabled = true - nodeConfig.SnapSyncTest.BatchCount = batchCount - nodeConfig.SnapSyncTest.DelayedCount = prevBatchMetaData.DelayedMessageCount - 1 - nodeConfig.SnapSyncTest.PrevDelayedRead = prevMessage.DelayedMessagesRead - nodeConfig.SnapSyncTest.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount) + nodeConfig := createNodeConfigWithSnapSync(t, builder) // Cleanup the message data of 2nd node, but keep the block state data. // This is to simulate a snap sync environment where we’ve just gotten the block state but don’t have any messages. - err = os.RemoveAll(nodeB.ConsensusNode.Stack.ResolvePath("arbitrumdata")) + err := os.RemoveAll(nodeB.ConsensusNode.Stack.ResolvePath("arbitrumdata")) Require(t, err) // Cleanup the 2nd node to release the database lock @@ -94,68 +62,110 @@ func TestSnapSync(t *testing.T) { defer cleanupC() // Create transactions till batch count is 20 + createTransactionTillBatchCount(ctx, t, builder, 20) + // Wait for nodeB to sync up to the first node + waitForBatchCountToCatchup(t, builder.L2.ConsensusNode.InboxTracker, nodeC.ConsensusNode.InboxTracker) + // Once the node is synced up, check if the batch metadata is the same for the last batch + // This is to ensure that the snap sync worked correctly + count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + Require(t, err) + metadata, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(count - 1) + Require(t, err) + metadataNodeC, err := nodeC.ConsensusNode.InboxTracker.GetBatchMetadata(count - 1) + Require(t, err) + if metadata != metadataNodeC { + t.Error("Batch metadata mismatch") + } + finalMessageCount := uint64(metadata.MessageCount) + waitForBlockToCatchupToMessageCount(ctx, t, builder.L2.Client, finalMessageCount) + waitForBlockToCatchupToMessageCount(ctx, t, nodeC.Client, finalMessageCount) + // Fetching message count - 1 instead on the latest block number as the latest block number might not be + // present in the snap sync node since it does not have the sequencer feed. + header, err := builder.L2.Client.HeaderByNumber(ctx, big.NewInt(int64(finalMessageCount)-1)) + Require(t, err) + headerNodeC, err := nodeC.Client.HeaderByNumber(ctx, big.NewInt(int64(finalMessageCount)-1)) + Require(t, err) + // Once the node is synced up, check if the block hash is the same for the last block + // This is to ensure that the snap sync worked correctly + if header.Hash().Cmp(headerNodeC.Hash()) != 0 { + t.Error("Block hash mismatch") + } +} + +func waitForBlockToCatchupToMessageCount( + ctx context.Context, + t *testing.T, + client *ethclient.Client, + finalMessageCount uint64, +) { for { - tx := builder.L2Info.PrepareTx("Faucet", "BackgroundUser", builder.L2Info.TransferGas, big.NewInt(1), nil) - err := builder.L2.Client.SendTransaction(ctx, tx) + latestHeaderNodeC, err := client.HeaderByNumber(ctx, nil) Require(t, err) - _, err = builder.L2.EnsureTxSucceeded(tx) - Require(t, err) - count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() - Require(t, err) - if count > 20 { + if latestHeaderNodeC.Number.Uint64() < uint64(finalMessageCount)-1 { + <-time.After(10 * time.Millisecond) + } else { break } } - // Wait for nodeB to sync up to the first node - finalMessageCount := uint64(0) +} + +func waitForBlocksToCatchup(ctx context.Context, t *testing.T, clientA *ethclient.Client, clientB *ethclient.Client) { for { - count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + headerA, err := clientA.HeaderByNumber(ctx, nil) Require(t, err) - countNodeC, err := nodeC.ConsensusNode.InboxTracker.GetBatchCount() + headerB, err := clientB.HeaderByNumber(ctx, nil) Require(t, err) - if count != countNodeC { + if headerA.Number.Cmp(headerB.Number) != 0 { <-time.After(10 * time.Millisecond) - continue - } - // Once the node is synced up, check if the batch metadata is the same for the last batch - // This is to ensure that the snap sync worked correctly - metadata, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(count - 1) - Require(t, err) - metadataNodeC, err := nodeC.ConsensusNode.InboxTracker.GetBatchMetadata(countNodeC - 1) - Require(t, err) - if metadata != metadataNodeC { - t.Error("Batch metadata mismatch") + } else { + break } - finalMessageCount = uint64(metadata.MessageCount) - break } +} + +func waitForBatchCountToCatchup(t *testing.T, inboxTrackerA *arbnode.InboxTracker, inboxTrackerB *arbnode.InboxTracker) { for { - latestHeader, err := builder.L2.Client.HeaderByNumber(ctx, nil) + countA, err := inboxTrackerA.GetBatchCount() Require(t, err) - if latestHeader.Number.Uint64() < uint64(finalMessageCount)-1 { + countB, err := inboxTrackerB.GetBatchCount() + Require(t, err) + if countA != countB { <-time.After(10 * time.Millisecond) } else { break } } +} + +func createTransactionTillBatchCount(ctx context.Context, t *testing.T, builder *NodeBuilder, finalCount uint64) { for { - latestHeaderNodeC, err := nodeC.Client.HeaderByNumber(ctx, nil) + tx := builder.L2Info.PrepareTx("Faucet", "BackgroundUser", builder.L2Info.TransferGas, big.NewInt(1), nil) + err := builder.L2.Client.SendTransaction(ctx, tx) Require(t, err) - if latestHeaderNodeC.Number.Uint64() < uint64(finalMessageCount)-1 { - <-time.After(10 * time.Millisecond) - } else { + _, err = builder.L2.EnsureTxSucceeded(tx) + Require(t, err) + count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + Require(t, err) + if count > finalCount { break } } - // Fetching message count - 1 instead on the latest block number as the latest block number might not be - // present in the snap sync node since it does not have the sequencer feed. - header, err := builder.L2.Client.HeaderByNumber(ctx, big.NewInt(int64(finalMessageCount)-1)) +} + +func createNodeConfigWithSnapSync(t *testing.T, builder *NodeBuilder) *arbnode.Config { + batchCount, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() Require(t, err) - headerNodeC, err := nodeC.Client.HeaderByNumber(ctx, big.NewInt(int64(finalMessageCount)-1)) + // Last batch is batchCount - 1, so prev batch is batchCount - 2 + prevBatchMetaData, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(batchCount - 2) Require(t, err) - // Once the node is synced up, check if the block hash is the same for the last block - // This is to ensure that the snap sync worked correctly - if header.Hash().Cmp(headerNodeC.Hash()) != 0 { - t.Error("Block hash mismatch") - } + prevMessage, err := builder.L2.ConsensusNode.TxStreamer.GetMessage(prevBatchMetaData.MessageCount - 1) + Require(t, err) + // Create a config with snap sync enabled and same database directory as the 2nd node + nodeConfig := builder.nodeConfig + nodeConfig.SnapSyncTest.Enabled = true + nodeConfig.SnapSyncTest.BatchCount = batchCount + nodeConfig.SnapSyncTest.DelayedCount = prevBatchMetaData.DelayedMessageCount - 1 + nodeConfig.SnapSyncTest.PrevDelayedRead = prevMessage.DelayedMessagesRead + nodeConfig.SnapSyncTest.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount) + return nodeConfig } From bd6e9ea0ba3c75e964a9be0b2dbf701481d10d9f Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Mon, 3 Jun 2024 20:15:47 +0530 Subject: [PATCH 19/21] fix tests --- system_tests/snap_sync_test.go | 50 ++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index 37dc964503..95c499a641 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -37,7 +37,9 @@ func TestSnapSync(t *testing.T) { // 2nd node without sequencer, syncs up to the first node. // This node will be stopped in middle and arbitrumdata will be deleted. - nodeB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{}) + testDir := t.TempDir() + nodeBStack := createStackConfigForTest(testDir) + nodeB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{stackConfig: nodeBStack}) builder.BridgeBalance(t, "Faucet", big.NewInt(1).Mul(big.NewInt(params.Ether), big.NewInt(10000))) @@ -58,13 +60,13 @@ func TestSnapSync(t *testing.T) { // Cleanup the 2nd node to release the database lock cleanupB() // New node with snap sync enabled, and the same database directory as the 2nd node but with no message data. - nodeC, cleanupC := builder.Build2ndNode(t, &SecondNodeParams{stackConfig: nodeB.ConsensusNode.Stack.Config(), nodeConfig: nodeConfig}) + nodeC, cleanupC := builder.Build2ndNode(t, &SecondNodeParams{stackConfig: nodeBStack, nodeConfig: nodeConfig}) defer cleanupC() // Create transactions till batch count is 20 createTransactionTillBatchCount(ctx, t, builder, 20) // Wait for nodeB to sync up to the first node - waitForBatchCountToCatchup(t, builder.L2.ConsensusNode.InboxTracker, nodeC.ConsensusNode.InboxTracker) + waitForBatchCountToCatchup(ctx, t, builder.L2.ConsensusNode.InboxTracker, nodeC.ConsensusNode.InboxTracker) // Once the node is synced up, check if the batch metadata is the same for the last batch // This is to ensure that the snap sync worked correctly count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() @@ -111,34 +113,42 @@ func waitForBlockToCatchupToMessageCount( func waitForBlocksToCatchup(ctx context.Context, t *testing.T, clientA *ethclient.Client, clientB *ethclient.Client) { for { - headerA, err := clientA.HeaderByNumber(ctx, nil) - Require(t, err) - headerB, err := clientB.HeaderByNumber(ctx, nil) - Require(t, err) - if headerA.Number.Cmp(headerB.Number) != 0 { - <-time.After(10 * time.Millisecond) - } else { - break + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Millisecond): + headerA, err := clientA.HeaderByNumber(ctx, nil) + Require(t, err) + headerB, err := clientB.HeaderByNumber(ctx, nil) + Require(t, err) + if headerA.Number.Cmp(headerB.Number) == 0 { + return + } } } } -func waitForBatchCountToCatchup(t *testing.T, inboxTrackerA *arbnode.InboxTracker, inboxTrackerB *arbnode.InboxTracker) { +func waitForBatchCountToCatchup(ctx context.Context, t *testing.T, inboxTrackerA *arbnode.InboxTracker, inboxTrackerB *arbnode.InboxTracker) { for { - countA, err := inboxTrackerA.GetBatchCount() - Require(t, err) - countB, err := inboxTrackerB.GetBatchCount() - Require(t, err) - if countA != countB { - <-time.After(10 * time.Millisecond) - } else { - break + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Millisecond): + countA, err := inboxTrackerA.GetBatchCount() + Require(t, err) + countB, err := inboxTrackerB.GetBatchCount() + Require(t, err) + if countA == countB { + return + } } + } } func createTransactionTillBatchCount(ctx context.Context, t *testing.T, builder *NodeBuilder, finalCount uint64) { for { + Require(t, ctx.Err()) tx := builder.L2Info.PrepareTx("Faucet", "BackgroundUser", builder.L2Info.TransferGas, big.NewInt(1), nil) err := builder.L2.Client.SendTransaction(ctx, tx) Require(t, err) From 80ee48a964c8f8e24f076d4ed49e1c5e842057af Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Mon, 3 Jun 2024 20:17:43 +0530 Subject: [PATCH 20/21] Changes based on PR comments --- system_tests/snap_sync_test.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index 95c499a641..27d06ed489 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -101,12 +101,15 @@ func waitForBlockToCatchupToMessageCount( finalMessageCount uint64, ) { for { - latestHeaderNodeC, err := client.HeaderByNumber(ctx, nil) - Require(t, err) - if latestHeaderNodeC.Number.Uint64() < uint64(finalMessageCount)-1 { - <-time.After(10 * time.Millisecond) - } else { - break + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Millisecond): + latestHeaderNodeC, err := client.HeaderByNumber(ctx, nil) + Require(t, err) + if latestHeaderNodeC.Number.Uint64() >= uint64(finalMessageCount)-1 { + return + } } } } From b82d5ecf46615472190495008067f5aac72fe052 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Mon, 3 Jun 2024 20:41:46 +0530 Subject: [PATCH 21/21] Changes based on PR comments --- system_tests/snap_sync_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index 27d06ed489..dd22bb027c 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -92,6 +92,11 @@ func TestSnapSync(t *testing.T) { if header.Hash().Cmp(headerNodeC.Hash()) != 0 { t.Error("Block hash mismatch") } + // This to ensure that the node did a snap sync and did not sync the batch before the snap sync batch. + _, err = nodeC.ConsensusNode.InboxTracker.GetBatchMetadata(nodeConfig.SnapSyncTest.BatchCount - 3) + if err == nil { + t.Error("Batch metadata should not be present for the batch before the snap sync batch") + } } func waitForBlockToCatchupToMessageCount(