Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Config Change] Removes HardReorg config from InboxReader #2542

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
282 changes: 272 additions & 10 deletions arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,22 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
},
},
}
err = tracker.AddDelayedMessages([]*DelayedInboxMessage{initMsgDelayed, userDelayed}, false)
delayedRequestId2 := common.BigToHash(common.Big2)
userDelayed2 := &DelayedInboxMessage{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really think it would improve the readability of these tests if we made a little helper method that could return a DelayedInboxMessage built from just the important arguments from the test case. For example, I think the only thing that matters here is that the first message uses delayedRequestId and the second one uses dealyedRequestId2, but I have to actually scan all of the lines between 38-51 and between 53-66 to be sure that something else doesn't differ between the two messages.

Also, if it weren't for your comment on line 137, I wouldn't have noticed that the timestamp was different. But, that would have been really clear if we used a helper function to get a "normal" DelayedInboxMessage and then modified the fields on the struct we wanted to change for the test.

BlockHash: [32]byte{},
BeforeInboxAcc: userDelayed.AfterInboxAcc(),
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_EndOfBlock,
Poster: [20]byte{},
BlockNumber: 0,
Timestamp: 0,
RequestId: &delayedRequestId2,
L1BaseFee: common.Big0,
},
},
}
err = tracker.AddDelayedMessages([]*DelayedInboxMessage{initMsgDelayed, userDelayed, userDelayed2})
Require(t, err)

serializedInitMsgBatch := make([]byte, 40)
Expand All @@ -75,8 +90,8 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
SequenceNumber: 1,
BeforeInboxAcc: [32]byte{1},
AfterInboxAcc: [32]byte{2},
AfterDelayedAcc: userDelayed.AfterInboxAcc(),
AfterDelayedCount: 2,
AfterDelayedAcc: userDelayed2.AfterInboxAcc(),
AfterDelayedCount: 3,
TimeBounds: bridgegen.IBridgeTimeBounds{},
rawLog: types.Log{},
dataLocation: 0,
Expand All @@ -89,8 +104,8 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
SequenceNumber: 2,
BeforeInboxAcc: [32]byte{2},
AfterInboxAcc: [32]byte{3},
AfterDelayedAcc: userDelayed.AfterInboxAcc(),
AfterDelayedCount: 2,
AfterDelayedAcc: userDelayed2.AfterInboxAcc(),
AfterDelayedCount: 3,
TimeBounds: bridgegen.IBridgeTimeBounds{},
rawLog: types.Log{},
dataLocation: 0,
Expand All @@ -100,28 +115,275 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{initMsgBatch, userMsgBatch, emptyBatch})
Require(t, err)

// Reorg out the user delayed message
err = tracker.ReorgDelayedTo(1, true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of just removing this test, we should instead have this test insert an alternative delayed message at position 1 which would cause the same reorg. Instead of getting the delayed count below, we could then get the delayed message to ensure it changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adjusted the test to trigger a reorg by adding an alternative delayed message through AddDelayedMessages as you mentioned, but the strategy that I implemented is a little bit different than the one you described.

msgCount, err := streamer.GetMessageCount()
Require(t, err)
if msgCount != 3 {
Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 3)")
}

delayedCount, err := tracker.GetDelayedCount()
Require(t, err)
if delayedCount != 3 {
Fail(t, "Unexpected tracker delayed message count", delayedCount, "(expected 3)")
}

msgCount, err := streamer.GetMessageCount()
batchCount, err := tracker.GetBatchCount()
Require(t, err)
if batchCount != 3 {
Fail(t, "Unexpected tracker batch count", batchCount, "(expected 3)")
}

// By modifying the timestamp of the userDelayed message, and adding it again, we cause a reorg
userDelayedModified := &DelayedInboxMessage{
BlockHash: [32]byte{},
BeforeInboxAcc: initMsgDelayed.AfterInboxAcc(),
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_EndOfBlock,
Poster: [20]byte{},
BlockNumber: 0,
Timestamp: userDelayed.Message.Header.Timestamp + 1,
RequestId: &delayedRequestId,
L1BaseFee: common.Big0,
},
},
}
err = tracker.AddDelayedMessages([]*DelayedInboxMessage{userDelayedModified})
joshuacolvin0 marked this conversation as resolved.
Show resolved Hide resolved
Require(t, err)

// userMsgBatch, and emptyBatch will be reorged out
msgCount, err = streamer.GetMessageCount()
Require(t, err)
if msgCount != 1 {
Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 1)")
}

batchCount, err = tracker.GetBatchCount()
Require(t, err)
if batchCount != 1 {
Fail(t, "Unexpected tracker batch count", batchCount, "(expected 1)")
}

// userDelayed2 will be deleted
delayedCount, err = tracker.GetDelayedCount()
Require(t, err)
if delayedCount != 2 {
Fail(t, "Unexpected tracker delayed message count", delayedCount, "(expected 2)")
}

// guarantees that delayed msg 1 is userDelayedModified and not userDelayed
msg, err := tracker.GetDelayedMessage(ctx, 1)
Require(t, err)
if msg.Header.RequestId.Cmp(*userDelayedModified.Message.Header.RequestId) != 0 {
Fail(t, "Unexpected delayed message requestId", msg.Header.RequestId, "(expected", userDelayedModified.Message.Header.RequestId, ")")
}
if msg.Header.Timestamp != userDelayedModified.Message.Header.Timestamp {
Fail(t, "Unexpected delayed message timestamp", msg.Header.Timestamp, "(expected", userDelayedModified.Message.Header.Timestamp, ")")
}
if userDelayedModified.Message.Header.Timestamp == userDelayed.Message.Header.Timestamp {
Fail(t, "Unexpected delayed message timestamp", userDelayedModified.Message.Header.Timestamp, "(expected", userDelayed.Message.Header.Timestamp, ")")
}

emptyBatch = &SequencerInboxBatch{
BlockHash: [32]byte{},
ParentChainBlockNumber: 0,
SequenceNumber: 1,
BeforeInboxAcc: [32]byte{1},
AfterInboxAcc: [32]byte{2},
AfterDelayedAcc: initMsgDelayed.AfterInboxAcc(),
AfterDelayedCount: 1,
TimeBounds: bridgegen.IBridgeTimeBounds{},
rawLog: types.Log{},
dataLocation: 0,
bridgeAddress: [20]byte{},
serialized: serializedInitMsgBatch,
}
err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{emptyBatch})
Require(t, err)

msgCount, err = streamer.GetMessageCount()
Require(t, err)
if msgCount != 2 {
Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 2)")
}

batchCount, err = tracker.GetBatchCount()
Require(t, err)
if batchCount != 2 {
Fail(t, "Unexpected tracker batch count", batchCount, "(expected 2)")
}
}

func TestSequencerReorgFromLastDelayedMsg(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
Require(t, err)

err = streamer.Start(ctx)
Require(t, err)
exec.Start(ctx)
init, err := streamer.GetMessage(0)
Require(t, err)

initMsgDelayed := &DelayedInboxMessage{
BlockHash: [32]byte{},
BeforeInboxAcc: [32]byte{},
Message: init.Message,
}
delayedRequestId := common.BigToHash(common.Big1)
userDelayed := &DelayedInboxMessage{
BlockHash: [32]byte{},
BeforeInboxAcc: initMsgDelayed.AfterInboxAcc(),
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_EndOfBlock,
Poster: [20]byte{},
BlockNumber: 0,
Timestamp: 0,
RequestId: &delayedRequestId,
L1BaseFee: common.Big0,
},
},
}
delayedRequestId2 := common.BigToHash(common.Big2)
userDelayed2 := &DelayedInboxMessage{
BlockHash: [32]byte{},
BeforeInboxAcc: userDelayed.AfterInboxAcc(),
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_EndOfBlock,
Poster: [20]byte{},
BlockNumber: 0,
Timestamp: 0,
RequestId: &delayedRequestId2,
L1BaseFee: common.Big0,
},
},
}
err = tracker.AddDelayedMessages([]*DelayedInboxMessage{initMsgDelayed, userDelayed, userDelayed2})
Require(t, err)

serializedInitMsgBatch := make([]byte, 40)
binary.BigEndian.PutUint64(serializedInitMsgBatch[32:], 1)
initMsgBatch := &SequencerInboxBatch{
BlockHash: [32]byte{},
ParentChainBlockNumber: 0,
SequenceNumber: 0,
BeforeInboxAcc: [32]byte{},
AfterInboxAcc: [32]byte{1},
AfterDelayedAcc: initMsgDelayed.AfterInboxAcc(),
AfterDelayedCount: 1,
TimeBounds: bridgegen.IBridgeTimeBounds{},
rawLog: types.Log{},
dataLocation: 0,
bridgeAddress: [20]byte{},
serialized: serializedInitMsgBatch,
}
serializedUserMsgBatch := make([]byte, 40)
binary.BigEndian.PutUint64(serializedUserMsgBatch[32:], 2)
userMsgBatch := &SequencerInboxBatch{
BlockHash: [32]byte{},
ParentChainBlockNumber: 0,
SequenceNumber: 1,
BeforeInboxAcc: [32]byte{1},
AfterInboxAcc: [32]byte{2},
AfterDelayedAcc: userDelayed2.AfterInboxAcc(),
AfterDelayedCount: 3,
TimeBounds: bridgegen.IBridgeTimeBounds{},
rawLog: types.Log{},
dataLocation: 0,
bridgeAddress: [20]byte{},
serialized: serializedUserMsgBatch,
}
emptyBatch := &SequencerInboxBatch{
BlockHash: [32]byte{},
ParentChainBlockNumber: 0,
SequenceNumber: 2,
BeforeInboxAcc: [32]byte{2},
AfterInboxAcc: [32]byte{3},
AfterDelayedAcc: userDelayed2.AfterInboxAcc(),
AfterDelayedCount: 3,
TimeBounds: bridgegen.IBridgeTimeBounds{},
rawLog: types.Log{},
dataLocation: 0,
bridgeAddress: [20]byte{},
serialized: serializedUserMsgBatch,
}
err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{initMsgBatch, userMsgBatch, emptyBatch})
Require(t, err)

msgCount, err := streamer.GetMessageCount()
Require(t, err)
if msgCount != 3 {
Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 3)")
}

delayedCount, err := tracker.GetDelayedCount()
Require(t, err)
if delayedCount != 1 {
Fail(t, "Unexpected tracker delayed message count", delayedCount, "(expected 1)")
if delayedCount != 3 {
Fail(t, "Unexpected tracker delayed message count", delayedCount, "(expected 3)")
}

batchCount, err := tracker.GetBatchCount()
Require(t, err)
if batchCount != 3 {
Fail(t, "Unexpected tracker batch count", batchCount, "(expected 3)")
}

// By modifying the timestamp of the userDelayed2 message, and adding it again, we cause a reorg
userDelayed2Modified := &DelayedInboxMessage{
BlockHash: [32]byte{},
BeforeInboxAcc: userDelayed.AfterInboxAcc(),
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_EndOfBlock,
Poster: [20]byte{},
BlockNumber: 0,
Timestamp: userDelayed2.Message.Header.Timestamp + 1,
RequestId: &delayedRequestId2,
L1BaseFee: common.Big0,
},
},
}
err = tracker.AddDelayedMessages([]*DelayedInboxMessage{userDelayed2Modified})
Require(t, err)

// FAILS HERE
// userMsgBatch, and emptyBatch will be reorged out
msgCount, err = streamer.GetMessageCount()
Require(t, err)
if msgCount != 1 {
Fail(t, "Unexpected tx streamer message count", msgCount, "(expected 1)")
}

batchCount, err = tracker.GetBatchCount()
Require(t, err)
if batchCount != 1 {
Fail(t, "Unexpected tracker batch count", batchCount, "(expected 1)")
}

delayedCount, err = tracker.GetDelayedCount()
Require(t, err)
if delayedCount != 3 {
Fail(t, "Unexpected tracker delayed message count", delayedCount, "(expected 3)")
}

// guarantees that delayed msg 2 is userDelayedModified and not userDelayed
msg, err := tracker.GetDelayedMessage(ctx, 2)
Require(t, err)
if msg.Header.RequestId.Cmp(*userDelayed2Modified.Message.Header.RequestId) != 0 {
Fail(t, "Unexpected delayed message requestId", msg.Header.RequestId, "(expected", userDelayed2Modified.Message.Header.RequestId, ")")
}
if msg.Header.Timestamp != userDelayed2Modified.Message.Header.Timestamp {
Fail(t, "Unexpected delayed message timestamp", msg.Header.Timestamp, "(expected", userDelayed2Modified.Message.Header.Timestamp, ")")
}
if userDelayed2Modified.Message.Header.Timestamp == userDelayed2.Message.Header.Timestamp {
Fail(t, "Unexpected delayed message timestamp", userDelayed2Modified.Message.Header.Timestamp, "(expected", userDelayed2.Message.Header.Timestamp, ")")
}

emptyBatch = &SequencerInboxBatch{
BlockHash: [32]byte{},
ParentChainBlockNumber: 0,
Expand Down
13 changes: 2 additions & 11 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
type InboxReaderConfig struct {
DelayBlocks uint64 `koanf:"delay-blocks" reload:"hot"`
CheckDelay time.Duration `koanf:"check-delay" reload:"hot"`
HardReorg bool `koanf:"hard-reorg" reload:"hot"`
MinBlocksToRead uint64 `koanf:"min-blocks-to-read" reload:"hot"`
DefaultBlocksToRead uint64 `koanf:"default-blocks-to-read" reload:"hot"`
TargetMessagesRead uint64 `koanf:"target-messages-read" reload:"hot"`
Expand All @@ -51,7 +50,6 @@ func (c *InboxReaderConfig) Validate() error {
func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".delay-blocks", DefaultInboxReaderConfig.DelayBlocks, "number of latest blocks to ignore to reduce reorgs")
f.Duration(prefix+".check-delay", DefaultInboxReaderConfig.CheckDelay, "the maximum time to wait between inbox checks (if not enough new blocks are found)")
f.Bool(prefix+".hard-reorg", DefaultInboxReaderConfig.HardReorg, "erase future transactions in addition to overwriting existing ones on reorg")
f.Uint64(prefix+".min-blocks-to-read", DefaultInboxReaderConfig.MinBlocksToRead, "the minimum number of blocks to read at once (when caught up lowers load on L1)")
f.Uint64(prefix+".default-blocks-to-read", DefaultInboxReaderConfig.DefaultBlocksToRead, "the default number of blocks to read at once (will vary based on traffic by default)")
f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once")
Expand All @@ -62,7 +60,6 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) {
var DefaultInboxReaderConfig = InboxReaderConfig{
DelayBlocks: 0,
CheckDelay: time.Minute,
HardReorg: false,
MinBlocksToRead: 1,
DefaultBlocksToRead: 100,
TargetMessagesRead: 500,
Expand All @@ -73,7 +70,6 @@ var DefaultInboxReaderConfig = InboxReaderConfig{
var TestInboxReaderConfig = InboxReaderConfig{
DelayBlocks: 0,
CheckDelay: time.Millisecond * 10,
HardReorg: false,
MinBlocksToRead: 1,
DefaultBlocksToRead: 100,
TargetMessagesRead: 500,
Expand Down Expand Up @@ -338,7 +334,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
missingDelayed = true
} else if ourLatestDelayedCount > checkingDelayedCount {
log.Info("backwards reorg of delayed messages", "from", ourLatestDelayedCount, "to", checkingDelayedCount)
err = r.tracker.ReorgDelayedTo(checkingDelayedCount, config.HardReorg)
err = r.tracker.ReorgDelayedTo(checkingDelayedCount)
if err != nil {
return err
}
Expand Down Expand Up @@ -373,11 +369,6 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
if ourLatestBatchCount < checkingBatchCount {
checkingBatchCount = ourLatestBatchCount
missingSequencer = true
} else if ourLatestBatchCount > checkingBatchCount && config.HardReorg {
err = r.tracker.ReorgBatchesTo(checkingBatchCount)
if err != nil {
return err
}
}
if checkingBatchCount > 0 {
checkingBatchSeqNum := checkingBatchCount - 1
Expand Down Expand Up @@ -566,7 +557,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}

func (r *InboxReader) addMessages(ctx context.Context, sequencerBatches []*SequencerInboxBatch, delayedMessages []*DelayedInboxMessage) (bool, error) {
err := r.tracker.AddDelayedMessages(delayedMessages, r.config().HardReorg)
err := r.tracker.AddDelayedMessages(delayedMessages)
if err != nil {
return false, err
}
Expand Down
Loading