From 2737e35e446ea36a57165214e2712c42298c0913 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Wed, 31 Jul 2024 16:08:44 -0300 Subject: [PATCH 1/7] Removes HardReorg config from InboxReader --- arbnode/delayed_seq_reorg_test.go | 153 ------------------------------ arbnode/inbox_reader.go | 13 +-- arbnode/inbox_tracker.go | 24 +++-- cmd/nitro/nitro.go | 4 - system_tests/seqinbox_test.go | 1 - 5 files changed, 13 insertions(+), 182 deletions(-) delete mode 100644 arbnode/delayed_seq_reorg_test.go diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go deleted file mode 100644 index 699eb3e8f6..0000000000 --- a/arbnode/delayed_seq_reorg_test.go +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright 2021-2022, Offchain Labs, Inc. -// For license information, see https://github.com/nitro/blob/master/LICENSE - -package arbnode - -import ( - "context" - "encoding/binary" - "testing" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/offchainlabs/nitro/arbos/arbostypes" - "github.com/offchainlabs/nitro/solgen/go/bridgegen" -) - -func TestSequencerReorgFromDelayed(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{}) - tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig) - Require(t, err) - - err = streamer.Start(ctx) - Require(t, err) - exec.Start(ctx) - init, err := streamer.GetMessage(0) - Require(t, err) - - initMsgDelayed := &DelayedInboxMessage{ - BlockHash: [32]byte{}, - BeforeInboxAcc: [32]byte{}, - Message: init.Message, - } - delayedRequestId := common.BigToHash(common.Big1) - userDelayed := &DelayedInboxMessage{ - BlockHash: [32]byte{}, - BeforeInboxAcc: initMsgDelayed.AfterInboxAcc(), - Message: &arbostypes.L1IncomingMessage{ - Header: &arbostypes.L1IncomingMessageHeader{ - Kind: arbostypes.L1MessageType_EndOfBlock, - Poster: [20]byte{}, - BlockNumber: 0, - Timestamp: 0, - RequestId: &delayedRequestId, - L1BaseFee: common.Big0, - }, - }, - } - err = tracker.AddDelayedMessages([]*DelayedInboxMessage{initMsgDelayed, userDelayed}, false) - Require(t, err) - - serializedInitMsgBatch := make([]byte, 40) - binary.BigEndian.PutUint64(serializedInitMsgBatch[32:], 1) - initMsgBatch := &SequencerInboxBatch{ - BlockHash: [32]byte{}, - ParentChainBlockNumber: 0, - SequenceNumber: 0, - BeforeInboxAcc: [32]byte{}, - AfterInboxAcc: [32]byte{1}, - AfterDelayedAcc: initMsgDelayed.AfterInboxAcc(), - AfterDelayedCount: 1, - TimeBounds: bridgegen.IBridgeTimeBounds{}, - rawLog: types.Log{}, - dataLocation: 0, - bridgeAddress: [20]byte{}, - serialized: serializedInitMsgBatch, - } - serializedUserMsgBatch := make([]byte, 40) - binary.BigEndian.PutUint64(serializedUserMsgBatch[32:], 2) - userMsgBatch := &SequencerInboxBatch{ - BlockHash: [32]byte{}, - ParentChainBlockNumber: 0, - SequenceNumber: 1, - BeforeInboxAcc: [32]byte{1}, - AfterInboxAcc: [32]byte{2}, - AfterDelayedAcc: userDelayed.AfterInboxAcc(), - AfterDelayedCount: 2, - TimeBounds: bridgegen.IBridgeTimeBounds{}, - rawLog: types.Log{}, - dataLocation: 0, - bridgeAddress: [20]byte{}, - serialized: serializedUserMsgBatch, - } - emptyBatch := &SequencerInboxBatch{ - BlockHash: [32]byte{}, - ParentChainBlockNumber: 0, - SequenceNumber: 2, - BeforeInboxAcc: [32]byte{2}, - AfterInboxAcc: [32]byte{3}, - AfterDelayedAcc: userDelayed.AfterInboxAcc(), - AfterDelayedCount: 2, - TimeBounds: bridgegen.IBridgeTimeBounds{}, - rawLog: types.Log{}, - dataLocation: 0, - bridgeAddress: [20]byte{}, - serialized: serializedUserMsgBatch, - } - err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{initMsgBatch, userMsgBatch, emptyBatch}) - Require(t, err) - - // Reorg out the user delayed message - err = tracker.ReorgDelayedTo(1, true) - Require(t, err) - - msgCount, err := streamer.GetMessageCount() - Require(t, err) - if msgCount != 1 { - Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 1)") - } - - delayedCount, err := tracker.GetDelayedCount() - Require(t, err) - if delayedCount != 1 { - Fail(t, "Unexpected tracker delayed message count", delayedCount, "(expected 1)") - } - - batchCount, err := tracker.GetBatchCount() - Require(t, err) - if batchCount != 1 { - Fail(t, "Unexpected tracker batch count", batchCount, "(expected 1)") - } - - emptyBatch = &SequencerInboxBatch{ - BlockHash: [32]byte{}, - ParentChainBlockNumber: 0, - SequenceNumber: 1, - BeforeInboxAcc: [32]byte{1}, - AfterInboxAcc: [32]byte{2}, - AfterDelayedAcc: initMsgDelayed.AfterInboxAcc(), - AfterDelayedCount: 1, - TimeBounds: bridgegen.IBridgeTimeBounds{}, - rawLog: types.Log{}, - dataLocation: 0, - bridgeAddress: [20]byte{}, - serialized: serializedInitMsgBatch, - } - err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{emptyBatch}) - Require(t, err) - - msgCount, err = streamer.GetMessageCount() - Require(t, err) - if msgCount != 2 { - Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 2)") - } - - batchCount, err = tracker.GetBatchCount() - Require(t, err) - if batchCount != 2 { - Fail(t, "Unexpected tracker batch count", batchCount, "(expected 2)") - } -} diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index 98104b2ea7..a44d506d41 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -27,7 +27,6 @@ import ( type InboxReaderConfig struct { DelayBlocks uint64 `koanf:"delay-blocks" reload:"hot"` CheckDelay time.Duration `koanf:"check-delay" reload:"hot"` - HardReorg bool `koanf:"hard-reorg" reload:"hot"` MinBlocksToRead uint64 `koanf:"min-blocks-to-read" reload:"hot"` DefaultBlocksToRead uint64 `koanf:"default-blocks-to-read" reload:"hot"` TargetMessagesRead uint64 `koanf:"target-messages-read" reload:"hot"` @@ -51,7 +50,6 @@ func (c *InboxReaderConfig) Validate() error { func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".delay-blocks", DefaultInboxReaderConfig.DelayBlocks, "number of latest blocks to ignore to reduce reorgs") f.Duration(prefix+".check-delay", DefaultInboxReaderConfig.CheckDelay, "the maximum time to wait between inbox checks (if not enough new blocks are found)") - f.Bool(prefix+".hard-reorg", DefaultInboxReaderConfig.HardReorg, "erase future transactions in addition to overwriting existing ones on reorg") f.Uint64(prefix+".min-blocks-to-read", DefaultInboxReaderConfig.MinBlocksToRead, "the minimum number of blocks to read at once (when caught up lowers load on L1)") f.Uint64(prefix+".default-blocks-to-read", DefaultInboxReaderConfig.DefaultBlocksToRead, "the default number of blocks to read at once (will vary based on traffic by default)") f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once") @@ -62,7 +60,6 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) { var DefaultInboxReaderConfig = InboxReaderConfig{ DelayBlocks: 0, CheckDelay: time.Minute, - HardReorg: false, MinBlocksToRead: 1, DefaultBlocksToRead: 100, TargetMessagesRead: 500, @@ -73,7 +70,6 @@ var DefaultInboxReaderConfig = InboxReaderConfig{ var TestInboxReaderConfig = InboxReaderConfig{ DelayBlocks: 0, CheckDelay: time.Millisecond * 10, - HardReorg: false, MinBlocksToRead: 1, DefaultBlocksToRead: 100, TargetMessagesRead: 500, @@ -338,7 +334,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { missingDelayed = true } else if ourLatestDelayedCount > checkingDelayedCount { log.Info("backwards reorg of delayed messages", "from", ourLatestDelayedCount, "to", checkingDelayedCount) - err = r.tracker.ReorgDelayedTo(checkingDelayedCount, config.HardReorg) + err = r.tracker.ReorgDelayedTo(checkingDelayedCount) if err != nil { return err } @@ -373,11 +369,6 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { if ourLatestBatchCount < checkingBatchCount { checkingBatchCount = ourLatestBatchCount missingSequencer = true - } else if ourLatestBatchCount > checkingBatchCount && config.HardReorg { - err = r.tracker.ReorgBatchesTo(checkingBatchCount) - if err != nil { - return err - } } if checkingBatchCount > 0 { checkingBatchSeqNum := checkingBatchCount - 1 @@ -566,7 +557,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } func (r *InboxReader) addMessages(ctx context.Context, sequencerBatches []*SequencerInboxBatch, delayedMessages []*DelayedInboxMessage) (bool, error) { - err := r.tracker.AddDelayedMessages(delayedMessages, r.config().HardReorg) + err := r.tracker.AddDelayedMessages(delayedMessages) if err != nil { return false, err } diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 7686fe413f..0eed2f5e1a 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -404,7 +404,7 @@ func (t *InboxTracker) GetDelayedMessageBytes(ctx context.Context, seqNum uint64 return msg.Serialize() } -func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardReorg bool) error { +func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage) error { var nextAcc common.Hash firstDelayedMsgToKeep := uint64(0) if len(messages) == 0 { @@ -440,17 +440,15 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR t.mutex.Lock() defer t.mutex.Unlock() - if !hardReorg { - // This math is safe to do as we know len(messages) > 0 - haveLastAcc, err := t.GetDelayedAcc(pos + uint64(len(messages)) - 1) - if err == nil { - if haveLastAcc == messages[len(messages)-1].AfterInboxAcc() { - // We already have these delayed messages - return nil - } - } else if !errors.Is(err, AccumulatorNotFoundErr) { - return err + // This math is safe to do as we know len(messages) > 0 + haveLastAcc, err := t.GetDelayedAcc(pos + uint64(len(messages)) - 1) + if err == nil { + if haveLastAcc == messages[len(messages)-1].AfterInboxAcc() { + // We already have these delayed messages + return nil } + } else if !errors.Is(err, AccumulatorNotFoundErr) { + return err } if pos > firstDelayedMsgToKeep { @@ -852,7 +850,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client *ethclien return nil } -func (t *InboxTracker) ReorgDelayedTo(count uint64, canReorgBatches bool) error { +func (t *InboxTracker) ReorgDelayedTo(count uint64) error { t.mutex.Lock() defer t.mutex.Unlock() @@ -867,7 +865,7 @@ func (t *InboxTracker) ReorgDelayedTo(count uint64, canReorgBatches bool) error return errors.New("attempted to reorg to future delayed count") } - return t.setDelayedCountReorgAndWriteBatch(t.db.NewBatch(), count, canReorgBatches) + return t.setDelayedCountReorgAndWriteBatch(t.db.NewBatch(), count, false) } func (t *InboxTracker) ReorgBatchesTo(count uint64) error { diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index bc2155a475..1d54e26a01 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -231,10 +231,6 @@ func mainImpl() int { nodeConfig.Node.ParentChainReader.Enable = true } - if nodeConfig.Execution.Sequencer.Enable && nodeConfig.Node.ParentChainReader.Enable && nodeConfig.Node.InboxReader.HardReorg { - flag.Usage() - log.Crit("hard reorgs cannot safely be enabled with sequencer mode enabled") - } if nodeConfig.Execution.Sequencer.Enable != nodeConfig.Node.Sequencer { log.Error("consensus and execution must agree if sequencing is enabled or not", "Execution.Sequencer.Enable", nodeConfig.Execution.Sequencer.Enable, "Node.Sequencer", nodeConfig.Node.Sequencer) } diff --git a/system_tests/seqinbox_test.go b/system_tests/seqinbox_test.go index a9f66b0e2f..e0da2d4f3f 100644 --- a/system_tests/seqinbox_test.go +++ b/system_tests/seqinbox_test.go @@ -139,7 +139,6 @@ func testSequencerInboxReaderImpl(t *testing.T, validator bool) { defer cancel() builder := NewNodeBuilder(ctx).DefaultConfig(t, true) - builder.nodeConfig.InboxReader.HardReorg = true if validator { builder.nodeConfig.BlockValidator.Enable = true } From 94a07f076e0e264387c28303755e4e53f2b2cb0e Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Fri, 4 Oct 2024 12:35:17 -0300 Subject: [PATCH 2/7] TestSequencerReorgFromDelayed --- arbnode/delayed_seq_reorg_test.go | 209 ++++++++++++++++++++++++++++++ 1 file changed, 209 insertions(+) create mode 100644 arbnode/delayed_seq_reorg_test.go diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go new file mode 100644 index 0000000000..78e7fe42ce --- /dev/null +++ b/arbnode/delayed_seq_reorg_test.go @@ -0,0 +1,209 @@ +// Copyright 2021-2022, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package arbnode + +import ( + "context" + "encoding/binary" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/offchainlabs/nitro/arbos/arbostypes" + "github.com/offchainlabs/nitro/solgen/go/bridgegen" +) + +func TestSequencerReorgFromDelayed(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{}) + tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig) + Require(t, err) + + err = streamer.Start(ctx) + Require(t, err) + exec.Start(ctx) + init, err := streamer.GetMessage(0) + Require(t, err) + + initMsgDelayed := &DelayedInboxMessage{ + BlockHash: [32]byte{}, + BeforeInboxAcc: [32]byte{}, + Message: init.Message, + } + delayedRequestId := common.BigToHash(common.Big1) + userDelayed := &DelayedInboxMessage{ + BlockHash: [32]byte{}, + BeforeInboxAcc: initMsgDelayed.AfterInboxAcc(), + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_EndOfBlock, + Poster: [20]byte{}, + BlockNumber: 0, + Timestamp: 0, + RequestId: &delayedRequestId, + L1BaseFee: common.Big0, + }, + }, + } + delayedRequestId2 := common.BigToHash(common.Big2) + userDelayed2 := &DelayedInboxMessage{ + BlockHash: [32]byte{}, + BeforeInboxAcc: userDelayed.AfterInboxAcc(), + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_EndOfBlock, + Poster: [20]byte{}, + BlockNumber: 0, + Timestamp: 0, + RequestId: &delayedRequestId2, + L1BaseFee: common.Big0, + }, + }, + } + err = tracker.AddDelayedMessages([]*DelayedInboxMessage{initMsgDelayed, userDelayed, userDelayed2}) + Require(t, err) + + serializedInitMsgBatch := make([]byte, 40) + binary.BigEndian.PutUint64(serializedInitMsgBatch[32:], 1) + initMsgBatch := &SequencerInboxBatch{ + BlockHash: [32]byte{}, + ParentChainBlockNumber: 0, + SequenceNumber: 0, + BeforeInboxAcc: [32]byte{}, + AfterInboxAcc: [32]byte{1}, + AfterDelayedAcc: initMsgDelayed.AfterInboxAcc(), + AfterDelayedCount: 1, + TimeBounds: bridgegen.IBridgeTimeBounds{}, + rawLog: types.Log{}, + dataLocation: 0, + bridgeAddress: [20]byte{}, + serialized: serializedInitMsgBatch, + } + serializedUserMsgBatch := make([]byte, 40) + binary.BigEndian.PutUint64(serializedUserMsgBatch[32:], 2) + userMsgBatch := &SequencerInboxBatch{ + BlockHash: [32]byte{}, + ParentChainBlockNumber: 0, + SequenceNumber: 1, + BeforeInboxAcc: [32]byte{1}, + AfterInboxAcc: [32]byte{2}, + AfterDelayedAcc: userDelayed2.AfterInboxAcc(), + AfterDelayedCount: 3, + TimeBounds: bridgegen.IBridgeTimeBounds{}, + rawLog: types.Log{}, + dataLocation: 0, + bridgeAddress: [20]byte{}, + serialized: serializedUserMsgBatch, + } + emptyBatch := &SequencerInboxBatch{ + BlockHash: [32]byte{}, + ParentChainBlockNumber: 0, + SequenceNumber: 2, + BeforeInboxAcc: [32]byte{2}, + AfterInboxAcc: [32]byte{3}, + AfterDelayedAcc: userDelayed2.AfterInboxAcc(), + AfterDelayedCount: 3, + TimeBounds: bridgegen.IBridgeTimeBounds{}, + rawLog: types.Log{}, + dataLocation: 0, + bridgeAddress: [20]byte{}, + serialized: serializedUserMsgBatch, + } + err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{initMsgBatch, userMsgBatch, emptyBatch}) + Require(t, err) + + msgCount, err := streamer.GetMessageCount() + Require(t, err) + if msgCount != 3 { + Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 3)") + } + + delayedCount, err := tracker.GetDelayedCount() + Require(t, err) + if delayedCount != 3 { + Fail(t, "Unexpected tracker delayed message count", delayedCount, "(expected 3)") + } + + // By modifying the timestamp of the userDelayed message, and adding it again, we remove userDelayed2 message. + userDelayedModified := &DelayedInboxMessage{ + BlockHash: [32]byte{}, + BeforeInboxAcc: initMsgDelayed.AfterInboxAcc(), + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_EndOfBlock, + Poster: [20]byte{}, + BlockNumber: 0, + Timestamp: userDelayed.Message.Header.Timestamp + 1, + RequestId: &delayedRequestId, + L1BaseFee: common.Big0, + }, + }, + } + err = tracker.AddDelayedMessages([]*DelayedInboxMessage{userDelayedModified}) + Require(t, err) + + // userMsgBatch, and emptyBatch will be deleted since their AfterDelayedAcc are not valid anymore after the reorg + msgCount, err = streamer.GetMessageCount() + Require(t, err) + if msgCount != 1 { + Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 1)") + } + + // userDelayed2 will be deleted since its AfterDelayedAcc is not valid anymore after the reorg + delayedCount, err = tracker.GetDelayedCount() + Require(t, err) + if delayedCount != 2 { + Fail(t, "Unexpected tracker delayed message count", delayedCount, "(expected 2)") + } + + // guarantees that delayed msg 1 is userDelayedModified and not userDelayed + msg, err := tracker.GetDelayedMessage(ctx, 1) + Require(t, err) + if (*msg.Header.RequestId).Cmp(*userDelayedModified.Message.Header.RequestId) != 0 { + Fail(t, "Unexpected delayed message requestId", msg.Header.RequestId, "(expected", userDelayedModified.Message.Header.RequestId, ")") + } + if msg.Header.Timestamp != userDelayedModified.Message.Header.Timestamp { + Fail(t, "Unexpected delayed message timestamp", msg.Header.Timestamp, "(expected", userDelayedModified.Message.Header.Timestamp, ")") + } + if userDelayedModified.Message.Header.Timestamp == userDelayed.Message.Header.Timestamp { + Fail(t, "Unexpected delayed message timestamp", userDelayedModified.Message.Header.Timestamp, "(expected", userDelayed.Message.Header.Timestamp, ")") + } + + batchCount, err := tracker.GetBatchCount() + Require(t, err) + if batchCount != 1 { + Fail(t, "Unexpected tracker batch count", batchCount, "(expected 1)") + } + + emptyBatch = &SequencerInboxBatch{ + BlockHash: [32]byte{}, + ParentChainBlockNumber: 0, + SequenceNumber: 1, + BeforeInboxAcc: [32]byte{1}, + AfterInboxAcc: [32]byte{2}, + AfterDelayedAcc: initMsgDelayed.AfterInboxAcc(), + AfterDelayedCount: 1, + TimeBounds: bridgegen.IBridgeTimeBounds{}, + rawLog: types.Log{}, + dataLocation: 0, + bridgeAddress: [20]byte{}, + serialized: serializedInitMsgBatch, + } + err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{emptyBatch}) + Require(t, err) + + msgCount, err = streamer.GetMessageCount() + Require(t, err) + if msgCount != 2 { + Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 2)") + } + + batchCount, err = tracker.GetBatchCount() + Require(t, err) + if batchCount != 2 { + Fail(t, "Unexpected tracker batch count", batchCount, "(expected 2)") + } +} From 176ffb3fe03770d0909a60d3827bf4170bcf56cf Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Fri, 4 Oct 2024 13:00:22 -0300 Subject: [PATCH 3/7] Fix lint issue --- arbnode/delayed_seq_reorg_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index 78e7fe42ce..ae7775cda9 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -162,7 +162,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) { // guarantees that delayed msg 1 is userDelayedModified and not userDelayed msg, err := tracker.GetDelayedMessage(ctx, 1) Require(t, err) - if (*msg.Header.RequestId).Cmp(*userDelayedModified.Message.Header.RequestId) != 0 { + if msg.Header.RequestId.Cmp(*userDelayedModified.Message.Header.RequestId) != 0 { Fail(t, "Unexpected delayed message requestId", msg.Header.RequestId, "(expected", userDelayedModified.Message.Header.RequestId, ")") } if msg.Header.Timestamp != userDelayedModified.Message.Header.Timestamp { From 4f8f8621fde2bc793b860ea1ab7a8827e314214c Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Fri, 4 Oct 2024 16:04:58 -0300 Subject: [PATCH 4/7] Improves TestSequencerReorgFromDelayed --- arbnode/delayed_seq_reorg_test.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index ae7775cda9..17f4756a53 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -127,7 +127,13 @@ func TestSequencerReorgFromDelayed(t *testing.T) { Fail(t, "Unexpected tracker delayed message count", delayedCount, "(expected 3)") } - // By modifying the timestamp of the userDelayed message, and adding it again, we remove userDelayed2 message. + batchCount, err := tracker.GetBatchCount() + Require(t, err) + if batchCount != 3 { + Fail(t, "Unexpected tracker batch count", batchCount, "(expected 3)") + } + + // By modifying the timestamp of the userDelayed message, and adding it again, we cause a reorg userDelayedModified := &DelayedInboxMessage{ BlockHash: [32]byte{}, BeforeInboxAcc: initMsgDelayed.AfterInboxAcc(), @@ -145,14 +151,20 @@ func TestSequencerReorgFromDelayed(t *testing.T) { err = tracker.AddDelayedMessages([]*DelayedInboxMessage{userDelayedModified}) Require(t, err) - // userMsgBatch, and emptyBatch will be deleted since their AfterDelayedAcc are not valid anymore after the reorg + // userMsgBatch, and emptyBatch will be reorged out msgCount, err = streamer.GetMessageCount() Require(t, err) if msgCount != 1 { Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 1)") } - // userDelayed2 will be deleted since its AfterDelayedAcc is not valid anymore after the reorg + batchCount, err = tracker.GetBatchCount() + Require(t, err) + if batchCount != 1 { + Fail(t, "Unexpected tracker batch count", batchCount, "(expected 1)") + } + + // userDelayed2 will be deleted delayedCount, err = tracker.GetDelayedCount() Require(t, err) if delayedCount != 2 { @@ -172,12 +184,6 @@ func TestSequencerReorgFromDelayed(t *testing.T) { Fail(t, "Unexpected delayed message timestamp", userDelayedModified.Message.Header.Timestamp, "(expected", userDelayed.Message.Header.Timestamp, ")") } - batchCount, err := tracker.GetBatchCount() - Require(t, err) - if batchCount != 1 { - Fail(t, "Unexpected tracker batch count", batchCount, "(expected 1)") - } - emptyBatch = &SequencerInboxBatch{ BlockHash: [32]byte{}, ParentChainBlockNumber: 0, From fa075bff1b3818e5bab8f7fe69c6fe140ceb178e Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Wed, 9 Oct 2024 10:48:50 -0300 Subject: [PATCH 5/7] TestSequencerReorgFromLastDelayedMsg --- arbnode/delayed_seq_reorg_test.go | 200 ++++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index 17f4756a53..86506a7f62 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -213,3 +213,203 @@ func TestSequencerReorgFromDelayed(t *testing.T) { Fail(t, "Unexpected tracker batch count", batchCount, "(expected 2)") } } + +func TestSequencerReorgFromLastDelayedMsg(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{}) + tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig) + Require(t, err) + + err = streamer.Start(ctx) + Require(t, err) + exec.Start(ctx) + init, err := streamer.GetMessage(0) + Require(t, err) + + initMsgDelayed := &DelayedInboxMessage{ + BlockHash: [32]byte{}, + BeforeInboxAcc: [32]byte{}, + Message: init.Message, + } + delayedRequestId := common.BigToHash(common.Big1) + userDelayed := &DelayedInboxMessage{ + BlockHash: [32]byte{}, + BeforeInboxAcc: initMsgDelayed.AfterInboxAcc(), + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_EndOfBlock, + Poster: [20]byte{}, + BlockNumber: 0, + Timestamp: 0, + RequestId: &delayedRequestId, + L1BaseFee: common.Big0, + }, + }, + } + delayedRequestId2 := common.BigToHash(common.Big2) + userDelayed2 := &DelayedInboxMessage{ + BlockHash: [32]byte{}, + BeforeInboxAcc: userDelayed.AfterInboxAcc(), + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_EndOfBlock, + Poster: [20]byte{}, + BlockNumber: 0, + Timestamp: 0, + RequestId: &delayedRequestId2, + L1BaseFee: common.Big0, + }, + }, + } + err = tracker.AddDelayedMessages([]*DelayedInboxMessage{initMsgDelayed, userDelayed, userDelayed2}) + Require(t, err) + + serializedInitMsgBatch := make([]byte, 40) + binary.BigEndian.PutUint64(serializedInitMsgBatch[32:], 1) + initMsgBatch := &SequencerInboxBatch{ + BlockHash: [32]byte{}, + ParentChainBlockNumber: 0, + SequenceNumber: 0, + BeforeInboxAcc: [32]byte{}, + AfterInboxAcc: [32]byte{1}, + AfterDelayedAcc: initMsgDelayed.AfterInboxAcc(), + AfterDelayedCount: 1, + TimeBounds: bridgegen.IBridgeTimeBounds{}, + rawLog: types.Log{}, + dataLocation: 0, + bridgeAddress: [20]byte{}, + serialized: serializedInitMsgBatch, + } + serializedUserMsgBatch := make([]byte, 40) + binary.BigEndian.PutUint64(serializedUserMsgBatch[32:], 2) + userMsgBatch := &SequencerInboxBatch{ + BlockHash: [32]byte{}, + ParentChainBlockNumber: 0, + SequenceNumber: 1, + BeforeInboxAcc: [32]byte{1}, + AfterInboxAcc: [32]byte{2}, + AfterDelayedAcc: userDelayed2.AfterInboxAcc(), + AfterDelayedCount: 3, + TimeBounds: bridgegen.IBridgeTimeBounds{}, + rawLog: types.Log{}, + dataLocation: 0, + bridgeAddress: [20]byte{}, + serialized: serializedUserMsgBatch, + } + emptyBatch := &SequencerInboxBatch{ + BlockHash: [32]byte{}, + ParentChainBlockNumber: 0, + SequenceNumber: 2, + BeforeInboxAcc: [32]byte{2}, + AfterInboxAcc: [32]byte{3}, + AfterDelayedAcc: userDelayed2.AfterInboxAcc(), + AfterDelayedCount: 3, + TimeBounds: bridgegen.IBridgeTimeBounds{}, + rawLog: types.Log{}, + dataLocation: 0, + bridgeAddress: [20]byte{}, + serialized: serializedUserMsgBatch, + } + err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{initMsgBatch, userMsgBatch, emptyBatch}) + Require(t, err) + + msgCount, err := streamer.GetMessageCount() + Require(t, err) + if msgCount != 3 { + Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 3)") + } + + delayedCount, err := tracker.GetDelayedCount() + Require(t, err) + if delayedCount != 3 { + Fail(t, "Unexpected tracker delayed message count", delayedCount, "(expected 3)") + } + + batchCount, err := tracker.GetBatchCount() + Require(t, err) + if batchCount != 3 { + Fail(t, "Unexpected tracker batch count", batchCount, "(expected 3)") + } + + // By modifying the timestamp of the userDelayed2 message, and adding it again, we cause a reorg + userDelayed2Modified := &DelayedInboxMessage{ + BlockHash: [32]byte{}, + BeforeInboxAcc: userDelayed.AfterInboxAcc(), + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_EndOfBlock, + Poster: [20]byte{}, + BlockNumber: 0, + Timestamp: userDelayed2.Message.Header.Timestamp + 1, + RequestId: &delayedRequestId2, + L1BaseFee: common.Big0, + }, + }, + } + err = tracker.AddDelayedMessages([]*DelayedInboxMessage{userDelayed2Modified}) + Require(t, err) + + // FAILS HERE + // userMsgBatch, and emptyBatch will be reorged out + msgCount, err = streamer.GetMessageCount() + Require(t, err) + if msgCount != 1 { + Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 1)") + } + + batchCount, err = tracker.GetBatchCount() + Require(t, err) + if batchCount != 1 { + Fail(t, "Unexpected tracker batch count", batchCount, "(expected 1)") + } + + delayedCount, err = tracker.GetDelayedCount() + Require(t, err) + if delayedCount != 3 { + Fail(t, "Unexpected tracker delayed message count", delayedCount, "(expected 3)") + } + + // guarantees that delayed msg 2 is userDelayedModified and not userDelayed + msg, err := tracker.GetDelayedMessage(ctx, 2) + Require(t, err) + if msg.Header.RequestId.Cmp(*userDelayed2Modified.Message.Header.RequestId) != 0 { + Fail(t, "Unexpected delayed message requestId", msg.Header.RequestId, "(expected", userDelayed2Modified.Message.Header.RequestId, ")") + } + if msg.Header.Timestamp != userDelayed2Modified.Message.Header.Timestamp { + Fail(t, "Unexpected delayed message timestamp", msg.Header.Timestamp, "(expected", userDelayed2Modified.Message.Header.Timestamp, ")") + } + if userDelayed2Modified.Message.Header.Timestamp == userDelayed2.Message.Header.Timestamp { + Fail(t, "Unexpected delayed message timestamp", userDelayed2Modified.Message.Header.Timestamp, "(expected", userDelayed2.Message.Header.Timestamp, ")") + } + + emptyBatch = &SequencerInboxBatch{ + BlockHash: [32]byte{}, + ParentChainBlockNumber: 0, + SequenceNumber: 1, + BeforeInboxAcc: [32]byte{1}, + AfterInboxAcc: [32]byte{2}, + AfterDelayedAcc: initMsgDelayed.AfterInboxAcc(), + AfterDelayedCount: 1, + TimeBounds: bridgegen.IBridgeTimeBounds{}, + rawLog: types.Log{}, + dataLocation: 0, + bridgeAddress: [20]byte{}, + serialized: serializedInitMsgBatch, + } + err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{emptyBatch}) + Require(t, err) + + msgCount, err = streamer.GetMessageCount() + Require(t, err) + if msgCount != 2 { + Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 2)") + } + + batchCount, err = tracker.GetBatchCount() + Require(t, err) + if batchCount != 2 { + Fail(t, "Unexpected tracker batch count", batchCount, "(expected 2)") + } +} From 80bfa33f29555b73fca061906768c399f71916a0 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 22 Oct 2024 09:38:23 -0500 Subject: [PATCH 6/7] Fix setDelayedCountReorgAndWriteBatch --- arbnode/inbox_tracker.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 0eed2f5e1a..ffd7ed9ab8 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -462,6 +462,7 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage) error } } + firstPos := pos batch := t.db.NewBatch() for _, message := range messages { seqNum, err := message.Message.Header.SeqNum() @@ -504,13 +505,16 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage) error pos++ } - return t.setDelayedCountReorgAndWriteBatch(batch, pos, true) + return t.setDelayedCountReorgAndWriteBatch(batch, firstPos, pos, true) } // All-in-one delayed message count adjuster. Can go forwards or backwards. // Requires the mutex is held. Sets the delayed count and performs any sequencer batch reorg necessary. // Also deletes any future delayed messages. -func (t *InboxTracker) setDelayedCountReorgAndWriteBatch(batch ethdb.Batch, newDelayedCount uint64, canReorgBatches bool) error { +func (t *InboxTracker) setDelayedCountReorgAndWriteBatch(batch ethdb.Batch, firstNewDelayedMessagePos uint64, newDelayedCount uint64, canReorgBatches bool) error { + if firstNewDelayedMessagePos > newDelayedCount { + return fmt.Errorf("firstNewDelayedMessagePos %v is after newDelayedCount %v", firstNewDelayedMessagePos, newDelayedCount) + } err := deleteStartingAt(t.db, batch, rlpDelayedMessagePrefix, uint64ToKey(newDelayedCount)) if err != nil { return err @@ -533,7 +537,7 @@ func (t *InboxTracker) setDelayedCountReorgAndWriteBatch(batch ethdb.Batch, newD return err } - seqBatchIter := t.db.NewIterator(delayedSequencedPrefix, uint64ToKey(newDelayedCount+1)) + seqBatchIter := t.db.NewIterator(delayedSequencedPrefix, uint64ToKey(firstNewDelayedMessagePos+1)) defer seqBatchIter.Release() var reorgSeqBatchesToCount *uint64 for seqBatchIter.Next() { @@ -865,7 +869,7 @@ func (t *InboxTracker) ReorgDelayedTo(count uint64) error { return errors.New("attempted to reorg to future delayed count") } - return t.setDelayedCountReorgAndWriteBatch(t.db.NewBatch(), count, false) + return t.setDelayedCountReorgAndWriteBatch(t.db.NewBatch(), count, count, false) } func (t *InboxTracker) ReorgBatchesTo(count uint64) error { From 241aed530dd4de0592ddc5f626a204211402ed96 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 22 Oct 2024 20:04:10 -0500 Subject: [PATCH 7/7] Address PR comment and update test --- arbnode/delayed_seq_reorg_test.go | 33 +++++++++++++++++++++++++++++-- arbnode/inbox_tracker.go | 16 +++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index 86506a7f62..6cec1e0904 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -333,6 +333,37 @@ func TestSequencerReorgFromLastDelayedMsg(t *testing.T) { Fail(t, "Unexpected tracker batch count", batchCount, "(expected 3)") } + // Adding an already existing message alongside a new one shouldn't cause a reorg + delayedRequestId3 := common.BigToHash(common.Big3) + userDelayed3 := &DelayedInboxMessage{ + BlockHash: [32]byte{}, + BeforeInboxAcc: userDelayed2.AfterInboxAcc(), + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_EndOfBlock, + Poster: [20]byte{}, + BlockNumber: 0, + Timestamp: 0, + RequestId: &delayedRequestId3, + L1BaseFee: common.Big0, + }, + }, + } + err = tracker.AddDelayedMessages([]*DelayedInboxMessage{userDelayed2, userDelayed3}) + Require(t, err) + + msgCount, err = streamer.GetMessageCount() + Require(t, err) + if msgCount != 3 { + Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 3)") + } + + batchCount, err = tracker.GetBatchCount() + Require(t, err) + if batchCount != 3 { + Fail(t, "Unexpected tracker batch count", batchCount, "(expected 3)") + } + // By modifying the timestamp of the userDelayed2 message, and adding it again, we cause a reorg userDelayed2Modified := &DelayedInboxMessage{ BlockHash: [32]byte{}, @@ -351,8 +382,6 @@ func TestSequencerReorgFromLastDelayedMsg(t *testing.T) { err = tracker.AddDelayedMessages([]*DelayedInboxMessage{userDelayed2Modified}) Require(t, err) - // FAILS HERE - // userMsgBatch, and emptyBatch will be reorged out msgCount, err = streamer.GetMessageCount() Require(t, err) if msgCount != 1 { diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index ffd7ed9ab8..04b60924d5 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -479,6 +479,22 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage) error } nextAcc = message.AfterInboxAcc() + if firstPos == pos { + // Check if this message is a duplicate + haveAcc, err := t.GetDelayedAcc(seqNum) + if err == nil { + if haveAcc == nextAcc { + // Skip this message, as we already have it in our database + pos++ + firstPos++ + messages = messages[1:] + continue + } + } else if !errors.Is(err, AccumulatorNotFoundErr) { + return err + } + } + delayedMsgKey := dbKey(rlpDelayedMessagePrefix, seqNum) msgData, err := rlp.EncodeToBytes(message.Message)