Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-1264] V0 Snap Sync #2265

Merged
merged 35 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1588fb7
V0 Snap Sync
amsanghi Apr 30, 2024
0a4b8b7
fixes
amsanghi May 2, 2024
fdf0772
Merge branch 'master' into snap_sync
amsanghi May 2, 2024
b718627
fix typo
amsanghi May 2, 2024
778eb25
cleanup
amsanghi May 2, 2024
a6d0c92
fix test
amsanghi May 2, 2024
ac9f171
fix
amsanghi May 6, 2024
7199df5
Merge branch 'master' into snap_sync
amsanghi May 6, 2024
54b5d7f
Merge branch 'master' into snap_sync
tsahee May 6, 2024
1309d92
Merge branch 'master' into snap_sync
amsanghi May 7, 2024
9259495
Changes based on PR comments
amsanghi May 7, 2024
b71bcd4
Changes based on PR comments
amsanghi May 7, 2024
edfa8f0
Merge branch 'master' into snap_sync
amsanghi May 9, 2024
c57bbae
Changes based on PR comments
amsanghi May 9, 2024
b22cf8f
Merge branch 'master' into snap_sync
amsanghi May 13, 2024
a4d4aeb
Changes based on PR comments
amsanghi May 14, 2024
95c79c2
minor fix
amsanghi May 14, 2024
3dbe2cd
Changes based on PR comments
amsanghi May 14, 2024
551e509
Merge branch 'master' into snap_sync
amsanghi May 14, 2024
46ddd69
Changes based on PR comments
amsanghi May 14, 2024
21208fb
fix test
amsanghi May 15, 2024
ddde138
fix race condition
amsanghi May 15, 2024
045018c
refractor
amsanghi May 15, 2024
1c4f76e
refractor
amsanghi May 15, 2024
4693a7a
Merge branch 'master' into snap_sync
amsanghi May 20, 2024
1fbb0cb
Merge branch 'master' into snap_sync
amsanghi May 28, 2024
45a23a4
Changes based on PR comments
amsanghi May 28, 2024
c8a3c16
Merge branch 'master' into snap_sync
amsanghi May 29, 2024
099cfac
Merge branch 'master' into snap_sync
amsanghi Jun 3, 2024
bd6e9ea
fix tests
amsanghi Jun 3, 2024
80ee48a
Changes based on PR comments
amsanghi Jun 3, 2024
b82d5ec
Changes based on PR comments
amsanghi Jun 3, 2024
4eaaced
Merge branch 'master' into snap_sync
amsanghi Jun 3, 2024
9889555
Merge branch 'master' into snap_sync
tsahee Jun 3, 2024
f299dfc
Merge branch 'master' into snap_sync
tsahee Jun 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil)
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
3 changes: 3 additions & 0 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ func (r *InboxReader) Start(ctxIn context.Context) error {
return err
}
if batchCount > 0 {
if r.tracker.snapSyncConfig.Enabled {
tsahee marked this conversation as resolved.
Show resolved Hide resolved
break
}
// Validate the init message matches our L2 blockchain
message, err := r.tracker.GetDelayedMessage(0)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*
}
execEngine.Initialize(gethexec.DefaultCachingConfig.StylusLRUCache)
execSeq := &execClientWrapper{execEngine, t}
inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher)
inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher, &DefaultSnapSyncConfig)
if err != nil {
Fail(t, err)
}
Expand Down
88 changes: 70 additions & 18 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,24 @@ var (
)

type InboxTracker struct {
db ethdb.Database
txStreamer *TransactionStreamer
mutex sync.Mutex
validator *staker.BlockValidator
dapReaders []daprovider.Reader
db ethdb.Database
txStreamer *TransactionStreamer
mutex sync.Mutex
validator *staker.BlockValidator
dapReaders []daprovider.Reader
snapSyncConfig SnapSyncConfig

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader) (*InboxTracker, error) {
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders []daprovider.Reader, snapSyncConfig SnapSyncConfig) (*InboxTracker, error) {
tracker := &InboxTracker{
db: db,
txStreamer: txStreamer,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
db: db,
txStreamer: txStreamer,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
snapSyncConfig: snapSyncConfig,
}
return tracker, nil
}
Expand Down Expand Up @@ -385,16 +387,40 @@ func (t *InboxTracker) GetDelayedMessageBytes(seqNum uint64) ([]byte, error) {
}

func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardReorg bool) error {
var nextAcc common.Hash
firstDelayedMsgToKeep := uint64(0)
if len(messages) == 0 {
return nil
}
t.mutex.Lock()
defer t.mutex.Unlock()

pos, err := messages[0].Message.Header.SeqNum()
if err != nil {
return err
}
if t.snapSyncConfig.Enabled && pos < t.snapSyncConfig.DelayedCount {
firstDelayedMsgToKeep = t.snapSyncConfig.DelayedCount
if firstDelayedMsgToKeep > 0 {
firstDelayedMsgToKeep--
}
for {
if len(messages) == 0 {
return nil
}
pos, err = messages[0].Message.Header.SeqNum()
if err != nil {
return err
}
if pos+1 == firstDelayedMsgToKeep {
nextAcc = messages[0].AfterInboxAcc()
}
if pos < firstDelayedMsgToKeep {
messages = messages[1:]
} else {
break
}
}
}
t.mutex.Lock()
defer t.mutex.Unlock()

if !hardReorg {
// This math is safe to do as we know len(messages) > 0
Expand All @@ -409,8 +435,7 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR
}
}

var nextAcc common.Hash
if pos > 0 {
if pos > firstDelayedMsgToKeep {
var err error
nextAcc, err = t.GetDelayedAcc(pos - 1)
if err != nil {
Expand Down Expand Up @@ -598,17 +623,44 @@ func (b *multiplexerBackend) ReadDelayedInbox(seqNum uint64) (*arbostypes.L1Inco
var delayedMessagesMismatch = errors.New("sequencer batch delayed messages missing or different")

func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L1Interface, batches []*SequencerInboxBatch) error {
var nextAcc common.Hash
var prevbatchmeta BatchMetadata
sequenceNumberToKeep := uint64(0)
if len(batches) == 0 {
return nil
}
if t.snapSyncConfig.Enabled && batches[0].SequenceNumber < t.snapSyncConfig.BatchCount {
sequenceNumberToKeep = t.snapSyncConfig.BatchCount
if sequenceNumberToKeep > 0 {
sequenceNumberToKeep--
}
for {
if len(batches) == 0 {
return nil
}
if batches[0].SequenceNumber+1 == sequenceNumberToKeep {
nextAcc = batches[0].AfterInboxAcc
prevbatchmeta = BatchMetadata{
Accumulator: batches[0].AfterInboxAcc,
DelayedMessageCount: batches[0].AfterDelayedCount,
MessageCount: arbutil.MessageIndex(t.snapSyncConfig.PrevBatchMessageCount),
ParentChainBlock: batches[0].ParentChainBlockNumber,
}
}
if batches[0].SequenceNumber < sequenceNumberToKeep {
batches = batches[1:]
} else {
break
}
}
}
t.mutex.Lock()
defer t.mutex.Unlock()

pos := batches[0].SequenceNumber
startPos := pos
var nextAcc common.Hash
var prevbatchmeta BatchMetadata
if pos > 0 {

if pos > sequenceNumberToKeep {
var err error
prevbatchmeta, err = t.GetBatchMetadata(pos - 1)
nextAcc = prevbatchmeta.Accumulator
Expand Down
23 changes: 21 additions & 2 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type Config struct {
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
// SnapSyncConfig is only used for testing purposes, these should not be configured in production.
SnapSyncTest SnapSyncConfig
}

func (c *Config) Validate() error {
Expand Down Expand Up @@ -175,6 +177,7 @@ var ConfigDefault = Config{
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceMgmt: resourcemanager.DefaultConfig,
Maintenance: DefaultMaintenanceConfig,
SnapSyncTest: DefaultSnapSyncConfig,
}

func ConfigDefaultL1Test() *Config {
Expand Down Expand Up @@ -273,6 +276,22 @@ type Node struct {
ctx context.Context
}

type SnapSyncConfig struct {
Enabled bool
PrevBatchMessageCount uint64
PrevDelayedRead uint64
BatchCount uint64
DelayedCount uint64
}

var DefaultSnapSyncConfig = SnapSyncConfig{
Enabled: false,
PrevBatchMessageCount: 0,
BatchCount: 0,
DelayedCount: 0,
PrevDelayedRead: 0,
}

type ConfigFetcher interface {
Get() *Config
Start(context.Context)
Expand Down Expand Up @@ -410,7 +429,7 @@ func createNodeImpl(
}

transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer }
txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher)
txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -540,7 +559,7 @@ func createNodeImpl(
if blobReader != nil {
dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(blobReader))
}
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders)
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders, config.SnapSyncTest)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type TransactionStreamer struct {
execLastMsgCount arbutil.MessageIndex
validator *staker.BlockValidator

db ethdb.Database
fatalErrChan chan<- error
config TransactionStreamerConfigFetcher
db ethdb.Database
fatalErrChan chan<- error
config TransactionStreamerConfigFetcher
snapSyncConfig *SnapSyncConfig

insertionMutex sync.Mutex // cannot be acquired while reorgMutex is held
reorgMutex sync.RWMutex
Expand Down Expand Up @@ -106,6 +107,7 @@ func NewTransactionStreamer(
broadcastServer *broadcaster.Broadcaster,
fatalErrChan chan<- error,
config TransactionStreamerConfigFetcher,
snapSyncConfig *SnapSyncConfig,
) (*TransactionStreamer, error) {
streamer := &TransactionStreamer{
exec: exec,
Expand All @@ -115,6 +117,7 @@ func NewTransactionStreamer(
broadcastServer: broadcastServer,
fatalErrChan: fatalErrChan,
config: config,
snapSyncConfig: snapSyncConfig,
cachedL1PriceData: &L1PriceData{
msgToL1PriceData: []L1PriceDataOfMsg{},
},
Expand Down Expand Up @@ -793,6 +796,9 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m
}

func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) (uint64, error) {
if s.snapSyncConfig.Enabled && uint64(pos) == s.snapSyncConfig.PrevBatchMessageCount {
return s.snapSyncConfig.PrevDelayedRead, nil
}
var prevDelayedRead uint64
if pos > 0 {
prevMsg, err := s.GetMessage(pos - 1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pruning/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node
return nil, fmt.Errorf("failed to get finalized block: %w", err)
}
l1BlockNum := l1Block.NumberU64()
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil)
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, arbnode.DefaultSnapSyncConfig)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading