Skip to content

Commit

Permalink
snap-sync consensus v0.3: wait for message to get snap sync parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi committed May 20, 2024
1 parent 2cea07c commit debd249
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 52 deletions.
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
tracker, err := NewInboxTracker(db, streamer, nil)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
43 changes: 41 additions & 2 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type InboxReaderConfig struct {
TargetMessagesRead uint64 `koanf:"target-messages-read" reload:"hot"`
MaxBlocksToRead uint64 `koanf:"max-blocks-to-read" reload:"hot"`
ReadMode string `koanf:"read-mode" reload:"hot"`
EnableSnapSync bool `koanf:"enable-snap-sync"`
// SnapSyncTest is only used for testing purposes, these should not be configured in production.
SnapSyncTest SnapSyncConfig
}

type InboxReaderConfigFetcher func() *InboxReaderConfig
Expand All @@ -56,6 +59,7 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once")
f.Uint64(prefix+".max-blocks-to-read", DefaultInboxReaderConfig.MaxBlocksToRead, "if adjust-blocks-to-read is enabled, the maximum number of blocks to read at once")
f.String(prefix+".read-mode", DefaultInboxReaderConfig.ReadMode, "mode to only read latest or safe or finalized L1 blocks. Enabling safe or finalized disables feed input and output. Defaults to latest. Takes string input, valid strings- latest, safe, finalized")
f.Bool(prefix+".enable-snap-sync", false, "enable snap sync")
}

var DefaultInboxReaderConfig = InboxReaderConfig{
Expand Down Expand Up @@ -95,13 +99,14 @@ type InboxReader struct {
caughtUpChan chan struct{}
client arbutil.L1Interface
l1Reader *headerreader.HeaderReader
rollupAddress common.Address

// Atomic
lastSeenBatchCount uint64
lastReadBatchCount uint64
}

func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) {
func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, rollupAddress common.Address, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) {
err := config().Validate()
if err != nil {
return nil, err
Expand All @@ -115,12 +120,40 @@ func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, l1Reader
firstMessageBlock: firstMessageBlock,
caughtUpChan: make(chan struct{}),
config: config,
rollupAddress: rollupAddress,
}, nil
}

func (r *InboxReader) Start(ctxIn context.Context) error {
r.StopWaiter.Start(ctxIn, r)
hadError := false
if r.config().EnableSnapSync {
batchCount, err := r.tracker.GetBatchCount()
if err != nil {
return err
}
if batchCount == 0 {
snapSyncConfig := r.fetchSnapSyncParameters()
r.tracker.SetSnapSyncParameters(snapSyncConfig)
r.tracker.txStreamer.SetSnapSyncParameters(snapSyncConfig)
firstMessageToRead := snapSyncConfig.DelayedCount
if firstMessageToRead > snapSyncConfig.BatchCount {
firstMessageToRead = snapSyncConfig.BatchCount
}
if firstMessageToRead > 0 {
firstMessageToRead--
}
// Find the first block containing the first message to read
// Subtract 1 to get the block before the first message to read,
// this is done to fetch previous batch metadata needed for snap sync.
block, err := FindBlockContainingBatch(ctxIn, r.rollupAddress, r.client, snapSyncConfig.ParentChainAssertionBlock, firstMessageToRead-1)
if err != nil {
return err
}
r.firstMessageBlock.SetUint64(block)
}

}
r.CallIteratively(func(ctx context.Context) time.Duration {
err := r.run(ctx, hadError)
if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") {
Expand All @@ -139,7 +172,7 @@ func (r *InboxReader) Start(ctxIn context.Context) error {
return err
}
if batchCount > 0 {
if r.tracker.snapSyncConfig.Enabled {
if r.config().EnableSnapSync {
break
}
// Validate the init message matches our L2 blockchain
Expand Down Expand Up @@ -172,6 +205,12 @@ func (r *InboxReader) Start(ctxIn context.Context) error {
return nil
}

func (r *InboxReader) fetchSnapSyncParameters() SnapSyncConfig {
// In the future, we will implement a way to fetch this is from other nodes,
// but for now we will just use the test config
return r.config().SnapSyncTest
}

// assumes l1block is recent so we could do a simple-search from the end
func (r *InboxReader) recentParentChainBlockToMsg(ctx context.Context, parentChainBlock uint64) (arbutil.MessageIndex, error) {
batch, err := r.tracker.GetBatchCount()
Expand Down
2 changes: 1 addition & 1 deletion arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*
Fail(t, err)
}
execSeq := &execClientWrapper{execEngine, t}
inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher, &DefaultSnapSyncConfig)
inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher)
if err != nil {
Fail(t, err)
}
Expand Down
15 changes: 9 additions & 6 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@ type InboxTracker struct {
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader, snapSyncConfig SnapSyncConfig) (*InboxTracker, error) {
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader) (*InboxTracker, error) {
tracker := &InboxTracker{
db: db,
txStreamer: txStreamer,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
snapSyncConfig: snapSyncConfig,
db: db,
txStreamer: txStreamer,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
}
return tracker, nil
}
Expand Down Expand Up @@ -225,6 +224,10 @@ func (t *InboxTracker) GetBatchCount() (uint64, error) {
return count, nil
}

func (t *InboxTracker) SetSnapSyncParameters(config SnapSyncConfig) {
t.snapSyncConfig = config
}

// err will return unexpected/internal errors
// bool will be false if batch not found (meaning, block not yet posted on a batch)
func (t *InboxTracker) FindInboxBatchContainingMessage(pos arbutil.MessageIndex) (uint64, bool, error) {
Expand Down
36 changes: 3 additions & 33 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ type Config struct {
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
// SnapSyncConfig is only used for testing purposes, these should not be configured in production.
SnapSyncTest SnapSyncConfig
}

func (c *Config) Validate() error {
Expand Down Expand Up @@ -177,7 +175,6 @@ var ConfigDefault = Config{
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceMgmt: resourcemanager.DefaultConfig,
Maintenance: DefaultMaintenanceConfig,
SnapSyncTest: DefaultSnapSyncConfig,
}

func ConfigDefaultL1Test() *Config {
Expand Down Expand Up @@ -285,15 +282,6 @@ type SnapSyncConfig struct {
ParentChainAssertionBlock uint64
}

var DefaultSnapSyncConfig = SnapSyncConfig{
Enabled: false,
PrevBatchMessageCount: 0,
PrevDelayedRead: 0,
BatchCount: 0,
DelayedCount: 0,
ParentChainAssertionBlock: 0,
}

type ConfigFetcher interface {
Get() *Config
Start(context.Context)
Expand Down Expand Up @@ -431,7 +419,7 @@ func createNodeImpl(
}

transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer }
txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest)
txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -561,29 +549,11 @@ func createNodeImpl(
if blobReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(blobReader))
}
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders, config.SnapSyncTest)
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders)
if err != nil {
return nil, err
}
firstMessageBlock := new(big.Int).SetUint64(deployInfo.DeployedAt)
if config.SnapSyncTest.Enabled {
firstMessageToRead := config.SnapSyncTest.DelayedCount
if firstMessageToRead > config.SnapSyncTest.BatchCount {
firstMessageToRead = config.SnapSyncTest.BatchCount
}
if firstMessageToRead > 0 {
firstMessageToRead--
}
// Find the first block containing the first message to read
// Subtract 1 to get the block before the first message to read,
// this is done to fetch previous batch metadata needed for snap sync.
block, err := FindBlockContainingBatch(ctx, deployInfo.Rollup, l1client, config.SnapSyncTest.ParentChainAssertionBlock, firstMessageToRead-1)
if err != nil {
return nil, err
}
firstMessageBlock.SetUint64(block)
}
inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, firstMessageBlock, delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader })
inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, new(big.Int).SetUint64(deployInfo.DeployedAt), deployInfo.Rollup, delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader })
if err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type TransactionStreamer struct {
db ethdb.Database
fatalErrChan chan<- error
config TransactionStreamerConfigFetcher
snapSyncConfig *SnapSyncConfig
snapSyncConfig SnapSyncConfig

insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held
reorgMutex sync.RWMutex
Expand Down Expand Up @@ -107,7 +107,6 @@ func NewTransactionStreamer(
broadcastServer *broadcaster.Broadcaster,
fatalErrChan chan<- error,
config TransactionStreamerConfigFetcher,
snapSyncConfig *SnapSyncConfig,
) (*TransactionStreamer, error) {
streamer := &TransactionStreamer{
exec: exec,
Expand All @@ -117,7 +116,6 @@ func NewTransactionStreamer(
broadcastServer: broadcastServer,
fatalErrChan: fatalErrChan,
config: config,
snapSyncConfig: snapSyncConfig,
cachedL1PriceData: &L1PriceData{
msgToL1PriceData: []L1PriceDataOfMsg{},
},
Expand Down Expand Up @@ -795,6 +793,10 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m
return s.addMessagesAndEndBatchImpl(pos, messagesAreConfirmed, messagesWithBlockHash, batch)
}

func (s *TransactionStreamer) SetSnapSyncParameters(config SnapSyncConfig) {
s.snapSyncConfig = config
}

func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) (uint64, error) {
if s.snapSyncConfig.Enabled && uint64(pos) == s.snapSyncConfig.PrevBatchMessageCount {
return s.snapSyncConfig.PrevDelayedRead, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/pruning/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node
return nil, fmt.Errorf("failed to get finalized block: %w", err)
}
l1BlockNum := l1Block.NumberU64()
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, arbnode.DefaultSnapSyncConfig)
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil)
if err != nil {
return nil, err
}
Expand Down
11 changes: 6 additions & 5 deletions system_tests/snap_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ func TestSnapSync(t *testing.T) {
Require(t, err)
// Create a config with snap sync enabled and same database directory as the 2nd node
nodeConfig := builder.nodeConfig
nodeConfig.SnapSyncTest.Enabled = true
nodeConfig.SnapSyncTest.BatchCount = batchCount
nodeConfig.SnapSyncTest.DelayedCount = prevBatchMetaData.DelayedMessageCount - 1
nodeConfig.SnapSyncTest.PrevDelayedRead = prevMessage.DelayedMessagesRead
nodeConfig.SnapSyncTest.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount)
nodeConfig.InboxReader.EnableSnapSync = true
nodeConfig.InboxReader.SnapSyncTest.Enabled = true
nodeConfig.InboxReader.SnapSyncTest.BatchCount = batchCount
nodeConfig.InboxReader.SnapSyncTest.DelayedCount = prevBatchMetaData.DelayedMessageCount - 1
nodeConfig.InboxReader.SnapSyncTest.PrevDelayedRead = prevMessage.DelayedMessagesRead
nodeConfig.InboxReader.SnapSyncTest.PrevBatchMessageCount = uint64(prevBatchMetaData.MessageCount)
// Cleanup the message data of 2nd node, but keep the block state data.
// This is to simulate a snap sync environment where we’ve just gotten the block state but don’t have any messages.
err = os.RemoveAll(nodeB.ConsensusNode.Stack.ResolvePath("arbitrumdata"))
Expand Down

0 comments on commit debd249

Please sign in to comment.