From 8d903c5997dec617961212271ca20bd9f2578603 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Thu, 2 May 2024 15:34:55 -0300 Subject: [PATCH 01/30] mv MessageWithMetadataAndBlockHash to arbostypes --- arbnode/transaction_streamer.go | 14 +++++++------- arbos/arbostypes/messagewithmeta.go | 5 +++++ broadcaster/broadcaster.go | 7 +------ 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 0d5ae829b0..b8b35186b2 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -465,9 +465,9 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde return err } - messagesWithBlockHash := make([]broadcaster.MessageWithMetadataAndBlockHash, 0, len(messagesResults)) + messagesWithBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messagesResults)) for i := 0; i < len(messagesResults); i++ { - messagesWithBlockHash = append(messagesWithBlockHash, broadcaster.MessageWithMetadataAndBlockHash{ + messagesWithBlockHash = append(messagesWithBlockHash, arbostypes.MessageWithMetadataAndBlockHash{ Message: newMessages[i], BlockHash: &messagesResults[i].BlockHash, }) @@ -1011,11 +1011,11 @@ func (s *TransactionStreamer) WriteMessageFromSequencer( return err } - msgWithBlockHash := broadcaster.MessageWithMetadataAndBlockHash{ + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ Message: msgWithMeta, BlockHash: &msgResult.BlockHash, } - s.broadcastMessages([]broadcaster.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) + s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) return nil } @@ -1046,7 +1046,7 @@ func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbosty } func (s *TransactionStreamer) broadcastMessages( - msgs []broadcaster.MessageWithMetadataAndBlockHash, + msgs []arbostypes.MessageWithMetadataAndBlockHash, pos arbutil.MessageIndex, ) { if s.broadcastServer == nil { @@ -1145,11 +1145,11 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution return false } - msgWithBlockHash := broadcaster.MessageWithMetadataAndBlockHash{ + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ Message: *msg, BlockHash: &msgResult.BlockHash, } - s.broadcastMessages([]broadcaster.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) + s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) return pos+1 < msgCount } diff --git a/arbos/arbostypes/messagewithmeta.go b/arbos/arbostypes/messagewithmeta.go index a3d4f5e3c3..e1215e0dd5 100644 --- a/arbos/arbostypes/messagewithmeta.go +++ b/arbos/arbostypes/messagewithmeta.go @@ -18,6 +18,11 @@ type MessageWithMetadata struct { DelayedMessagesRead uint64 `json:"delayedMessagesRead"` } +type MessageWithMetadataAndBlockHash struct { + Message MessageWithMetadata + BlockHash *common.Hash +} + var EmptyTestMessageWithMetadata = MessageWithMetadata{ Message: &EmptyTestIncomingMessage, } diff --git a/broadcaster/broadcaster.go b/broadcaster/broadcaster.go index ac5c6c39da..da1de6665e 100644 --- a/broadcaster/broadcaster.go +++ b/broadcaster/broadcaster.go @@ -22,11 +22,6 @@ import ( "github.com/offchainlabs/nitro/wsbroadcastserver" ) -type MessageWithMetadataAndBlockHash struct { - Message arbostypes.MessageWithMetadata - BlockHash *common.Hash -} - type Broadcaster struct { server *wsbroadcastserver.WSBroadcastServer backlog backlog.Backlog @@ -98,7 +93,7 @@ func (b *Broadcaster) BroadcastSingleFeedMessage(bfm *m.BroadcastFeedMessage) { } func (b *Broadcaster) BroadcastMessages( - messagesWithBlockHash []MessageWithMetadataAndBlockHash, + messagesWithBlockHash []arbostypes.MessageWithMetadataAndBlockHash, seq arbutil.MessageIndex, ) (err error) { defer func() { From 1f99ca984c2c3ac9c6f0af11680fe3ee756a7746 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Thu, 2 May 2024 17:09:03 -0300 Subject: [PATCH 02/30] pass MessageWithMetadataAndBlockHashes to writeMessages --- arbnode/inbox_test.go | 20 ++++--- arbnode/inbox_tracker.go | 9 ++- arbnode/seq_coordinator.go | 7 ++- arbnode/transaction_streamer.go | 77 ++++++++++++++----------- execution/gethexec/executionengine.go | 6 +- execution/gethexec/node.go | 2 +- execution/interface.go | 2 +- system_tests/contract_tx_test.go | 26 +++++---- system_tests/reorg_resequencing_test.go | 8 ++- system_tests/seq_coordinator_test.go | 5 +- 10 files changed, 92 insertions(+), 70 deletions(-) diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index 5c879743a4..fbd1dba96a 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -128,7 +128,7 @@ func TestTransactionStreamer(t *testing.T) { } state.balances = newBalances - var messages []arbostypes.MessageWithMetadata + var messages []arbostypes.MessageWithMetadataAndBlockHash // TODO replay a random amount of messages too numMessages := rand.Int() % 5 for j := 0; j < numMessages; j++ { @@ -154,16 +154,18 @@ func TestTransactionStreamer(t *testing.T) { l2Message = append(l2Message, arbmath.U256Bytes(value)...) var requestId common.Hash binary.BigEndian.PutUint64(requestId.Bytes()[:8], uint64(i)) - messages = append(messages, arbostypes.MessageWithMetadata{ - Message: &arbostypes.L1IncomingMessage{ - Header: &arbostypes.L1IncomingMessageHeader{ - Kind: arbostypes.L1MessageType_L2Message, - Poster: source, - RequestId: &requestId, + messages = append(messages, arbostypes.MessageWithMetadataAndBlockHash{ + Message: arbostypes.MessageWithMetadata{ + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_L2Message, + Poster: source, + RequestId: &requestId, + }, + L2msg: l2Message, }, - L2msg: l2Message, + DelayedMessagesRead: 1, }, - DelayedMessagesRead: 1, }) state.balances[source].Sub(state.balances[source], value) if state.balances[dest] == nil { diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index ba1b875ec8..e2aa1d5e74 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -652,7 +652,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L pos++ } - var messages []arbostypes.MessageWithMetadata + var messages []arbostypes.MessageWithMetadataAndBlockHash backend := &multiplexerBackend{ batchSeqNum: batches[0].SequenceNumber, batches: batches, @@ -673,7 +673,10 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L if err != nil { return err } - messages = append(messages, *msg) + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + Message: *msg, + } + messages = append(messages, msgWithBlockHash) batchMessageCounts[batchSeqNum] = currentpos currentpos += 1 } @@ -733,7 +736,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L } var latestTimestamp uint64 if len(messages) > 0 { - latestTimestamp = messages[len(messages)-1].Message.Header.Timestamp + latestTimestamp = messages[len(messages)-1].Message.Message.Header.Timestamp } log.Info( "InboxTracker", diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index ecf38ddf42..0a27d89d40 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -533,7 +533,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { if readUntil > localMsgCount+c.config.MsgPerPoll { readUntil = localMsgCount + c.config.MsgPerPoll } - var messages []arbostypes.MessageWithMetadata + var messages []arbostypes.MessageWithMetadataAndBlockHash msgToRead := localMsgCount var msgReadErr error for msgToRead < readUntil { @@ -592,7 +592,10 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { DelayedMessagesRead: lastDelayedMsg, } } - messages = append(messages, message) + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + Message: message, + } + messages = append(messages, msgWithBlockHash) msgToRead++ } if len(messages) > 0 { diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index b8b35186b2..411eba965d 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -60,7 +60,7 @@ type TransactionStreamer struct { nextAllowedFeedReorgLog time.Time - broadcasterQueuedMessages []arbostypes.MessageWithMetadata + broadcasterQueuedMessages []arbostypes.MessageWithMetadataAndBlockHash broadcasterQueuedMessagesPos uint64 broadcasterQueuedMessagesActiveReorg bool @@ -371,7 +371,7 @@ func deleteFromRange(ctx context.Context, db ethdb.Database, prefix []byte, star // The insertion mutex must be held. This acquires the reorg mutex. // Note: oldMessages will be empty if reorgHook is nil -func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata) error { +func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash) error { if count == 0 { return errors.New("cannot reorg out init message") } @@ -465,14 +465,14 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde return err } - messagesWithBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messagesResults)) + messagesWithComputedBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messagesResults)) for i := 0; i < len(messagesResults); i++ { - messagesWithBlockHash = append(messagesWithBlockHash, arbostypes.MessageWithMetadataAndBlockHash{ - Message: newMessages[i], + messagesWithComputedBlockHash = append(messagesWithComputedBlockHash, arbostypes.MessageWithMetadataAndBlockHash{ + Message: newMessages[i].Message, BlockHash: &messagesResults[i].BlockHash, }) } - s.broadcastMessages(messagesWithBlockHash, count) + s.broadcastMessages(messagesWithComputedBlockHash, count) if s.validator != nil { err = s.validator.Reorg(s.GetContext(), count) @@ -555,7 +555,7 @@ func (s *TransactionStreamer) GetProcessedMessageCount() (arbutil.MessageIndex, return msgCount, nil } -func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata) error { +func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash) error { return s.AddMessagesAndEndBatch(pos, messagesAreConfirmed, messages, nil) } @@ -579,7 +579,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe return nil } broadcastStartPos := feedMessages[0].SequenceNumber - var messages []arbostypes.MessageWithMetadata + var messages []arbostypes.MessageWithMetadataAndBlockHash broadcastAfterPos := broadcastStartPos for _, feedMessage := range feedMessages { if broadcastAfterPos != feedMessage.SequenceNumber { @@ -588,7 +588,11 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe if feedMessage.Message.Message == nil || feedMessage.Message.Message.Header == nil { return fmt.Errorf("invalid feed message at sequence number %v", feedMessage.SequenceNumber) } - messages = append(messages, feedMessage.Message) + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + Message: feedMessage.Message, + BlockHash: feedMessage.BlockHash, + } + messages = append(messages, msgWithBlockHash) broadcastAfterPos++ } @@ -607,7 +611,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe messages = messages[dups:] broadcastStartPos += arbutil.MessageIndex(dups) if oldMsg != nil { - s.logReorg(broadcastStartPos, oldMsg, &messages[0], false) + s.logReorg(broadcastStartPos, oldMsg, &messages[0].Message, false) } if len(messages) == 0 { // No new messages received @@ -681,16 +685,19 @@ func (s *TransactionStreamer) AddFakeInitMessage() error { } chainIdBytes := arbmath.U256Bytes(s.chainConfig.ChainID) msg := append(append(chainIdBytes, 0), chainConfigJson...) - return s.AddMessages(0, false, []arbostypes.MessageWithMetadata{{ - Message: &arbostypes.L1IncomingMessage{ - Header: &arbostypes.L1IncomingMessageHeader{ - Kind: arbostypes.L1MessageType_Initialize, - RequestId: &common.Hash{}, - L1BaseFee: common.Big0, + return s.AddMessages(0, false, []arbostypes.MessageWithMetadataAndBlockHash{{ + Message: arbostypes.MessageWithMetadata{ + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_Initialize, + RequestId: &common.Hash{}, + L1BaseFee: common.Big0, + }, + L2msg: msg, }, - L2msg: msg, + DelayedMessagesRead: 1, }, - DelayedMessagesRead: 1, + BlockHash: nil, }}) } @@ -708,7 +715,7 @@ func endBatch(batch ethdb.Batch) error { return batch.Write() } -func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error { +func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error { if messagesAreConfirmed { // Trim confirmed messages from l1pricedataCache s.TrimCache(pos + arbutil.MessageIndex(len(messages))) @@ -748,7 +755,7 @@ func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) ( func (s *TransactionStreamer) countDuplicateMessages( pos arbutil.MessageIndex, - messages []arbostypes.MessageWithMetadata, + messages []arbostypes.MessageWithMetadataAndBlockHash, batch *ethdb.Batch, ) (int, bool, *arbostypes.MessageWithMetadata, error) { curMsg := 0 @@ -768,7 +775,7 @@ func (s *TransactionStreamer) countDuplicateMessages( if err != nil { return 0, false, nil, err } - nextMessage := messages[curMsg] + nextMessage := messages[curMsg].Message wantMessage, err := rlp.EncodeToBytes(nextMessage) if err != nil { return 0, false, nil, err @@ -842,7 +849,7 @@ func (s *TransactionStreamer) logReorg(pos arbutil.MessageIndex, dbMsg *arbostyp } -func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error { +func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error { var confirmedReorg bool var oldMsg *arbostypes.MessageWithMetadata var lastDelayedRead uint64 @@ -860,7 +867,7 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil return err } if duplicates > 0 { - lastDelayedRead = messages[duplicates-1].DelayedMessagesRead + lastDelayedRead = messages[duplicates-1].Message.DelayedMessagesRead messages = messages[duplicates:] messageStartPos += arbutil.MessageIndex(duplicates) } @@ -898,13 +905,13 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil return err } if duplicates > 0 { - lastDelayedRead = messages[duplicates-1].DelayedMessagesRead + lastDelayedRead = messages[duplicates-1].Message.DelayedMessagesRead messages = messages[duplicates:] messageStartPos += arbutil.MessageIndex(duplicates) } } if oldMsg != nil { - s.logReorg(messageStartPos, oldMsg, &messages[0], confirmedReorg) + s.logReorg(messageStartPos, oldMsg, &messages[0].Message, confirmedReorg) } if feedReorg { @@ -924,12 +931,12 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil // Validate delayed message counts of remaining messages for i, msg := range messages { msgPos := messageStartPos + arbutil.MessageIndex(i) - diff := msg.DelayedMessagesRead - lastDelayedRead + diff := msg.Message.DelayedMessagesRead - lastDelayedRead if diff != 0 && diff != 1 { - return fmt.Errorf("attempted to insert jump from %v delayed messages read to %v delayed messages read at message index %v", lastDelayedRead, msg.DelayedMessagesRead, msgPos) + return fmt.Errorf("attempted to insert jump from %v delayed messages read to %v delayed messages read at message index %v", lastDelayedRead, msg.Message.DelayedMessagesRead, msgPos) } - lastDelayedRead = msg.DelayedMessagesRead - if msg.Message == nil { + lastDelayedRead = msg.Message.DelayedMessagesRead + if msg.Message.Message == nil { return fmt.Errorf("attempted to insert nil message at position %v", msgPos) } } @@ -1007,14 +1014,14 @@ func (s *TransactionStreamer) WriteMessageFromSequencer( } } - if err := s.writeMessages(pos, []arbostypes.MessageWithMetadata{msgWithMeta}, nil); err != nil { - return err - } - msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ Message: msgWithMeta, BlockHash: &msgResult.BlockHash, } + + if err := s.writeMessages(pos, []arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, nil); err != nil { + return err + } s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) return nil @@ -1059,12 +1066,12 @@ func (s *TransactionStreamer) broadcastMessages( // The mutex must be held, and pos must be the latest message count. // `batch` may be nil, which initializes a new batch. The batch is closed out in this function. -func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error { +func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error { if batch == nil { batch = s.db.NewBatch() } for i, msg := range messages { - err := s.writeMessage(pos+arbutil.MessageIndex(i), msg, batch) + err := s.writeMessage(pos+arbutil.MessageIndex(i), msg.Message, batch) if err != nil { return err } diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 38569f44ab..c4fbc04712 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -116,7 +116,7 @@ func (s *ExecutionEngine) GetBatchFetcher() execution.BatchFetcher { return s.consensus } -func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { +func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { if count == 0 { return nil, errors.New("cannot reorg out genesis") } @@ -149,9 +149,9 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost for i := range newMessages { var msgForPrefetch *arbostypes.MessageWithMetadata if i < len(newMessages)-1 { - msgForPrefetch = &newMessages[i] + msgForPrefetch = &newMessages[i].Message } - msgResult, err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i], msgForPrefetch) + msgResult, err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i].Message, msgForPrefetch) if err != nil { return nil, err } diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index ae76b88530..458d6601c5 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -346,7 +346,7 @@ func (n *ExecutionNode) StopAndWait() { func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) (*execution.MessageResult, error) { return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch) } -func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { +func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { return n.ExecEngine.Reorg(count, newMessages, oldMessages) } func (n *ExecutionNode) HeadMessageNumber() (arbutil.MessageIndex, error) { diff --git a/execution/interface.go b/execution/interface.go index d2a5b58fe5..66aefe9a5e 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -31,7 +31,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken") // always needed type ExecutionClient interface { DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) (*MessageResult, error) - Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) ([]*MessageResult, error) + Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*MessageResult, error) HeadMessageNumber() (arbutil.MessageIndex, error) HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error) ResultAtPos(pos arbutil.MessageIndex) (*MessageResult, error) diff --git a/system_tests/contract_tx_test.go b/system_tests/contract_tx_test.go index 7d66e516b4..d0f7b153f3 100644 --- a/system_tests/contract_tx_test.go +++ b/system_tests/contract_tx_test.go @@ -69,21 +69,23 @@ func TestContractTxDeploy(t *testing.T) { l2Msg = append(l2Msg, arbmath.U256Bytes(contractTx.Value)...) l2Msg = append(l2Msg, contractTx.Data...) - err = builder.L2.ConsensusNode.TxStreamer.AddMessages(pos, true, []arbostypes.MessageWithMetadata{ + err = builder.L2.ConsensusNode.TxStreamer.AddMessages(pos, true, []arbostypes.MessageWithMetadataAndBlockHash{ { - Message: &arbostypes.L1IncomingMessage{ - Header: &arbostypes.L1IncomingMessageHeader{ - Kind: arbostypes.L1MessageType_L2Message, - Poster: from, - BlockNumber: 0, - Timestamp: 0, - RequestId: &contractTx.RequestId, - L1BaseFee: &big.Int{}, + Message: arbostypes.MessageWithMetadata{ + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_L2Message, + Poster: from, + BlockNumber: 0, + Timestamp: 0, + RequestId: &contractTx.RequestId, + L1BaseFee: &big.Int{}, + }, + L2msg: l2Msg, + BatchGasCost: new(uint64), }, - L2msg: l2Msg, - BatchGasCost: new(uint64), + DelayedMessagesRead: delayedMessagesRead, }, - DelayedMessagesRead: delayedMessagesRead, }, }) Require(t, err) diff --git a/system_tests/reorg_resequencing_test.go b/system_tests/reorg_resequencing_test.go index b188504acb..6d5ecd5e6a 100644 --- a/system_tests/reorg_resequencing_test.go +++ b/system_tests/reorg_resequencing_test.go @@ -72,9 +72,11 @@ func TestReorgResequencing(t *testing.T) { }, L2msg: append(builder.L2Info.GetAddress("User4").Bytes(), arbmath.Uint64ToU256Bytes(params.Ether)...), } - err = builder.L2.ConsensusNode.TxStreamer.AddMessages(startMsgCount, true, []arbostypes.MessageWithMetadata{{ - Message: newMessage, - DelayedMessagesRead: prevMessage.DelayedMessagesRead + 1, + err = builder.L2.ConsensusNode.TxStreamer.AddMessages(startMsgCount, true, []arbostypes.MessageWithMetadataAndBlockHash{{ + Message: arbostypes.MessageWithMetadata{ + Message: newMessage, + DelayedMessagesRead: prevMessage.DelayedMessagesRead + 1, + }, }}) Require(t, err) diff --git a/system_tests/seq_coordinator_test.go b/system_tests/seq_coordinator_test.go index 886a0528c7..5e539a8812 100644 --- a/system_tests/seq_coordinator_test.go +++ b/system_tests/seq_coordinator_test.go @@ -91,7 +91,10 @@ func TestRedisSeqCoordinatorPriorities(t *testing.T) { return false } Require(t, err) - Require(t, node.TxStreamer.AddMessages(curMsgs, false, []arbostypes.MessageWithMetadata{emptyMessage})) + emptyMessageWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + Message: emptyMessage, + } + Require(t, node.TxStreamer.AddMessages(curMsgs, false, []arbostypes.MessageWithMetadataAndBlockHash{emptyMessageWithBlockHash})) return true } From bf3c9602d822c50277b705c46e22fc151e72fa78 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Thu, 2 May 2024 17:21:20 -0300 Subject: [PATCH 03/30] rename Message to MessageWithMeta in MessageWithMetadataAndBlockHash --- arbnode/inbox_test.go | 2 +- arbnode/inbox_tracker.go | 4 +-- arbnode/seq_coordinator.go | 2 +- arbnode/transaction_streamer.go | 38 ++++++++++++------------- arbos/arbostypes/messagewithmeta.go | 4 +-- broadcaster/broadcaster.go | 2 +- execution/gethexec/executionengine.go | 4 +-- system_tests/contract_tx_test.go | 2 +- system_tests/reorg_resequencing_test.go | 2 +- system_tests/seq_coordinator_test.go | 2 +- 10 files changed, 31 insertions(+), 31 deletions(-) diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index fbd1dba96a..a5d1554cb1 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -155,7 +155,7 @@ func TestTransactionStreamer(t *testing.T) { var requestId common.Hash binary.BigEndian.PutUint64(requestId.Bytes()[:8], uint64(i)) messages = append(messages, arbostypes.MessageWithMetadataAndBlockHash{ - Message: arbostypes.MessageWithMetadata{ + MessageWithMeta: arbostypes.MessageWithMetadata{ Message: &arbostypes.L1IncomingMessage{ Header: &arbostypes.L1IncomingMessageHeader{ Kind: arbostypes.L1MessageType_L2Message, diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index e2aa1d5e74..2340df8303 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -674,7 +674,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L return err } msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ - Message: *msg, + MessageWithMeta: *msg, } messages = append(messages, msgWithBlockHash) batchMessageCounts[batchSeqNum] = currentpos @@ -736,7 +736,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L } var latestTimestamp uint64 if len(messages) > 0 { - latestTimestamp = messages[len(messages)-1].Message.Message.Header.Timestamp + latestTimestamp = messages[len(messages)-1].MessageWithMeta.Message.Header.Timestamp } log.Info( "InboxTracker", diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 0a27d89d40..2fb8c3244b 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -593,7 +593,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { } } msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ - Message: message, + MessageWithMeta: message, } messages = append(messages, msgWithBlockHash) msgToRead++ diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 411eba965d..708dcff41b 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -468,8 +468,8 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde messagesWithComputedBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messagesResults)) for i := 0; i < len(messagesResults); i++ { messagesWithComputedBlockHash = append(messagesWithComputedBlockHash, arbostypes.MessageWithMetadataAndBlockHash{ - Message: newMessages[i].Message, - BlockHash: &messagesResults[i].BlockHash, + MessageWithMeta: newMessages[i].MessageWithMeta, + BlockHash: &messagesResults[i].BlockHash, }) } s.broadcastMessages(messagesWithComputedBlockHash, count) @@ -589,8 +589,8 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe return fmt.Errorf("invalid feed message at sequence number %v", feedMessage.SequenceNumber) } msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ - Message: feedMessage.Message, - BlockHash: feedMessage.BlockHash, + MessageWithMeta: feedMessage.Message, + BlockHash: feedMessage.BlockHash, } messages = append(messages, msgWithBlockHash) broadcastAfterPos++ @@ -611,7 +611,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe messages = messages[dups:] broadcastStartPos += arbutil.MessageIndex(dups) if oldMsg != nil { - s.logReorg(broadcastStartPos, oldMsg, &messages[0].Message, false) + s.logReorg(broadcastStartPos, oldMsg, &messages[0].MessageWithMeta, false) } if len(messages) == 0 { // No new messages received @@ -686,7 +686,7 @@ func (s *TransactionStreamer) AddFakeInitMessage() error { chainIdBytes := arbmath.U256Bytes(s.chainConfig.ChainID) msg := append(append(chainIdBytes, 0), chainConfigJson...) return s.AddMessages(0, false, []arbostypes.MessageWithMetadataAndBlockHash{{ - Message: arbostypes.MessageWithMetadata{ + MessageWithMeta: arbostypes.MessageWithMetadata{ Message: &arbostypes.L1IncomingMessage{ Header: &arbostypes.L1IncomingMessageHeader{ Kind: arbostypes.L1MessageType_Initialize, @@ -775,7 +775,7 @@ func (s *TransactionStreamer) countDuplicateMessages( if err != nil { return 0, false, nil, err } - nextMessage := messages[curMsg].Message + nextMessage := messages[curMsg].MessageWithMeta wantMessage, err := rlp.EncodeToBytes(nextMessage) if err != nil { return 0, false, nil, err @@ -867,7 +867,7 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil return err } if duplicates > 0 { - lastDelayedRead = messages[duplicates-1].Message.DelayedMessagesRead + lastDelayedRead = messages[duplicates-1].MessageWithMeta.DelayedMessagesRead messages = messages[duplicates:] messageStartPos += arbutil.MessageIndex(duplicates) } @@ -905,13 +905,13 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil return err } if duplicates > 0 { - lastDelayedRead = messages[duplicates-1].Message.DelayedMessagesRead + lastDelayedRead = messages[duplicates-1].MessageWithMeta.DelayedMessagesRead messages = messages[duplicates:] messageStartPos += arbutil.MessageIndex(duplicates) } } if oldMsg != nil { - s.logReorg(messageStartPos, oldMsg, &messages[0].Message, confirmedReorg) + s.logReorg(messageStartPos, oldMsg, &messages[0].MessageWithMeta, confirmedReorg) } if feedReorg { @@ -931,12 +931,12 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil // Validate delayed message counts of remaining messages for i, msg := range messages { msgPos := messageStartPos + arbutil.MessageIndex(i) - diff := msg.Message.DelayedMessagesRead - lastDelayedRead + diff := msg.MessageWithMeta.DelayedMessagesRead - lastDelayedRead if diff != 0 && diff != 1 { - return fmt.Errorf("attempted to insert jump from %v delayed messages read to %v delayed messages read at message index %v", lastDelayedRead, msg.Message.DelayedMessagesRead, msgPos) + return fmt.Errorf("attempted to insert jump from %v delayed messages read to %v delayed messages read at message index %v", lastDelayedRead, msg.MessageWithMeta.DelayedMessagesRead, msgPos) } - lastDelayedRead = msg.Message.DelayedMessagesRead - if msg.Message.Message == nil { + lastDelayedRead = msg.MessageWithMeta.DelayedMessagesRead + if msg.MessageWithMeta.Message == nil { return fmt.Errorf("attempted to insert nil message at position %v", msgPos) } } @@ -1015,8 +1015,8 @@ func (s *TransactionStreamer) WriteMessageFromSequencer( } msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ - Message: msgWithMeta, - BlockHash: &msgResult.BlockHash, + MessageWithMeta: msgWithMeta, + BlockHash: &msgResult.BlockHash, } if err := s.writeMessages(pos, []arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, nil); err != nil { @@ -1071,7 +1071,7 @@ func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages [ batch = s.db.NewBatch() } for i, msg := range messages { - err := s.writeMessage(pos+arbutil.MessageIndex(i), msg.Message, batch) + err := s.writeMessage(pos+arbutil.MessageIndex(i), msg.MessageWithMeta, batch) if err != nil { return err } @@ -1153,8 +1153,8 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution } msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ - Message: *msg, - BlockHash: &msgResult.BlockHash, + MessageWithMeta: *msg, + BlockHash: &msgResult.BlockHash, } s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) diff --git a/arbos/arbostypes/messagewithmeta.go b/arbos/arbostypes/messagewithmeta.go index e1215e0dd5..79b7c4f9d2 100644 --- a/arbos/arbostypes/messagewithmeta.go +++ b/arbos/arbostypes/messagewithmeta.go @@ -19,8 +19,8 @@ type MessageWithMetadata struct { } type MessageWithMetadataAndBlockHash struct { - Message MessageWithMetadata - BlockHash *common.Hash + MessageWithMeta MessageWithMetadata + BlockHash *common.Hash } var EmptyTestMessageWithMetadata = MessageWithMetadata{ diff --git a/broadcaster/broadcaster.go b/broadcaster/broadcaster.go index da1de6665e..ba95f2d8af 100644 --- a/broadcaster/broadcaster.go +++ b/broadcaster/broadcaster.go @@ -104,7 +104,7 @@ func (b *Broadcaster) BroadcastMessages( }() var feedMessages []*m.BroadcastFeedMessage for i, msg := range messagesWithBlockHash { - bfm, err := b.NewBroadcastFeedMessage(msg.Message, seq+arbutil.MessageIndex(i), msg.BlockHash) + bfm, err := b.NewBroadcastFeedMessage(msg.MessageWithMeta, seq+arbutil.MessageIndex(i), msg.BlockHash) if err != nil { return err } diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index c4fbc04712..b31209b882 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -149,9 +149,9 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost for i := range newMessages { var msgForPrefetch *arbostypes.MessageWithMetadata if i < len(newMessages)-1 { - msgForPrefetch = &newMessages[i].Message + msgForPrefetch = &newMessages[i].MessageWithMeta } - msgResult, err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i].Message, msgForPrefetch) + msgResult, err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i].MessageWithMeta, msgForPrefetch) if err != nil { return nil, err } diff --git a/system_tests/contract_tx_test.go b/system_tests/contract_tx_test.go index d0f7b153f3..c4ae326df1 100644 --- a/system_tests/contract_tx_test.go +++ b/system_tests/contract_tx_test.go @@ -71,7 +71,7 @@ func TestContractTxDeploy(t *testing.T) { err = builder.L2.ConsensusNode.TxStreamer.AddMessages(pos, true, []arbostypes.MessageWithMetadataAndBlockHash{ { - Message: arbostypes.MessageWithMetadata{ + MessageWithMeta: arbostypes.MessageWithMetadata{ Message: &arbostypes.L1IncomingMessage{ Header: &arbostypes.L1IncomingMessageHeader{ Kind: arbostypes.L1MessageType_L2Message, diff --git a/system_tests/reorg_resequencing_test.go b/system_tests/reorg_resequencing_test.go index 6d5ecd5e6a..28d1b3bd66 100644 --- a/system_tests/reorg_resequencing_test.go +++ b/system_tests/reorg_resequencing_test.go @@ -73,7 +73,7 @@ func TestReorgResequencing(t *testing.T) { L2msg: append(builder.L2Info.GetAddress("User4").Bytes(), arbmath.Uint64ToU256Bytes(params.Ether)...), } err = builder.L2.ConsensusNode.TxStreamer.AddMessages(startMsgCount, true, []arbostypes.MessageWithMetadataAndBlockHash{{ - Message: arbostypes.MessageWithMetadata{ + MessageWithMeta: arbostypes.MessageWithMetadata{ Message: newMessage, DelayedMessagesRead: prevMessage.DelayedMessagesRead + 1, }, diff --git a/system_tests/seq_coordinator_test.go b/system_tests/seq_coordinator_test.go index 5e539a8812..36c7be7251 100644 --- a/system_tests/seq_coordinator_test.go +++ b/system_tests/seq_coordinator_test.go @@ -92,7 +92,7 @@ func TestRedisSeqCoordinatorPriorities(t *testing.T) { } Require(t, err) emptyMessageWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ - Message: emptyMessage, + MessageWithMeta: emptyMessage, } Require(t, node.TxStreamer.AddMessages(curMsgs, false, []arbostypes.MessageWithMetadataAndBlockHash{emptyMessageWithBlockHash})) return true From 44e44c7a10a642fe66eac3159e0712038e57f45f Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Thu, 2 May 2024 17:59:42 -0300 Subject: [PATCH 04/30] write block hash alongsing with message with metadata --- arbnode/schema.go | 1 + arbnode/transaction_streamer.go | 31 +++++++++++++++++++++---------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/arbnode/schema.go b/arbnode/schema.go index ddc7cf54fd..2854b7e785 100644 --- a/arbnode/schema.go +++ b/arbnode/schema.go @@ -5,6 +5,7 @@ package arbnode var ( messagePrefix []byte = []byte("m") // maps a message sequence number to a message + blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1 rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 708dcff41b..5debe0c41f 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -775,8 +775,8 @@ func (s *TransactionStreamer) countDuplicateMessages( if err != nil { return 0, false, nil, err } - nextMessage := messages[curMsg].MessageWithMeta - wantMessage, err := rlp.EncodeToBytes(nextMessage) + nextMessage := messages[curMsg] + wantMessage, err := rlp.EncodeToBytes(nextMessage.MessageWithMeta) if err != nil { return 0, false, nil, err } @@ -792,12 +792,12 @@ func (s *TransactionStreamer) countDuplicateMessages( return curMsg, true, nil, nil } var duplicateMessage bool - if nextMessage.Message != nil { - if dbMessageParsed.Message.BatchGasCost == nil || nextMessage.Message.BatchGasCost == nil { + if nextMessage.MessageWithMeta.Message != nil { + if dbMessageParsed.Message.BatchGasCost == nil || nextMessage.MessageWithMeta.Message.BatchGasCost == nil { // Remove both of the batch gas costs and see if the messages still differ - nextMessageCopy := nextMessage + nextMessageCopy := nextMessage.MessageWithMeta nextMessageCopy.Message = new(arbostypes.L1IncomingMessage) - *nextMessageCopy.Message = *nextMessage.Message + *nextMessageCopy.Message = *nextMessage.MessageWithMeta.Message batchGasCostBkup := dbMessageParsed.Message.BatchGasCost dbMessageParsed.Message.BatchGasCost = nil nextMessageCopy.Message.BatchGasCost = nil @@ -805,7 +805,7 @@ func (s *TransactionStreamer) countDuplicateMessages( // Actually this isn't a reorg; only the batch gas costs differed duplicateMessage = true // If possible - update the message in the database to add the gas cost cache. - if batch != nil && nextMessage.Message.BatchGasCost != nil { + if batch != nil && nextMessage.MessageWithMeta.Message.BatchGasCost != nil { if *batch == nil { *batch = s.db.NewBatch() } @@ -1043,9 +1043,20 @@ func (s *TransactionStreamer) PopulateFeedBacklog() error { return s.inboxReader.tracker.PopulateFeedBacklog(s.broadcastServer) } -func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbostypes.MessageWithMetadata, batch ethdb.Batch) error { +func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error { + // write message with metadata key := dbKey(messagePrefix, uint64(pos)) - msgBytes, err := rlp.EncodeToBytes(msg) + msgBytes, err := rlp.EncodeToBytes(msg.MessageWithMeta) + if err != nil { + return err + } + if err := batch.Put(key, msgBytes); err != nil { + return err + } + + // write block hash + key = dbKey(blockHashInputFeedPrefix, uint64(pos)) + msgBytes, err = rlp.EncodeToBytes(msg.BlockHash) if err != nil { return err } @@ -1071,7 +1082,7 @@ func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages [ batch = s.db.NewBatch() } for i, msg := range messages { - err := s.writeMessage(pos+arbutil.MessageIndex(i), msg.MessageWithMeta, batch) + err := s.writeMessage(pos+arbutil.MessageIndex(i), msg, batch) if err != nil { return err } From da0b605ad80d65ca5ae00c438a412ccf4af8a968 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Thu, 2 May 2024 18:58:59 -0300 Subject: [PATCH 05/30] prune block hashes in db --- arbnode/message_pruner.go | 25 +++++++++++++++++-------- arbnode/message_pruner_test.go | 6 ++++-- arbnode/transaction_streamer.go | 4 ++++ 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/arbnode/message_pruner.go b/arbnode/message_pruner.go index 31bf1a63ff..c31dbc496d 100644 --- a/arbnode/message_pruner.go +++ b/arbnode/message_pruner.go @@ -23,13 +23,14 @@ import ( type MessagePruner struct { stopwaiter.StopWaiter - transactionStreamer *TransactionStreamer - inboxTracker *InboxTracker - config MessagePrunerConfigFetcher - pruningLock sync.Mutex - lastPruneDone time.Time - cachedPrunedMessages uint64 - cachedPrunedDelayedMessages uint64 + transactionStreamer *TransactionStreamer + inboxTracker *InboxTracker + config MessagePrunerConfigFetcher + pruningLock sync.Mutex + lastPruneDone time.Time + cachedPrunedMessages uint64 + cachedPrunedBlockHashesInputFeed uint64 + cachedPrunedDelayedMessages uint64 } type MessagePrunerConfig struct { @@ -115,7 +116,15 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g } func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64) error { - prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount)) + prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, blockHashInputFeedPrefix, &m.cachedPrunedBlockHashesInputFeed, uint64(messageCount)) + if err != nil { + return fmt.Errorf("error deleting last batch messages' block hashes: %w", err) + } + if len(prunedKeysRange) > 0 { + log.Info("Pruned last batch messages' block hashes:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1]) + } + + prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount)) if err != nil { return fmt.Errorf("error deleting last batch messages: %w", err) } diff --git a/arbnode/message_pruner_test.go b/arbnode/message_pruner_test.go index 0212ed2364..ed85c0ebce 100644 --- a/arbnode/message_pruner_test.go +++ b/arbnode/message_pruner_test.go @@ -22,8 +22,8 @@ func TestMessagePrunerWithPruningEligibleMessagePresent(t *testing.T) { Require(t, err) checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix) + checkDbKeys(t, messagesCount, transactionStreamerDb, blockHashInputFeedPrefix) checkDbKeys(t, messagesCount, inboxTrackerDb, rlpDelayedMessagePrefix) - } func TestMessagePrunerTwoHalves(t *testing.T) { @@ -71,16 +71,18 @@ func TestMessagePrunerWithNoPruningEligibleMessagePresent(t *testing.T) { Require(t, err) checkDbKeys(t, uint64(messagesCount), transactionStreamerDb, messagePrefix) + checkDbKeys(t, uint64(messagesCount), transactionStreamerDb, blockHashInputFeedPrefix) checkDbKeys(t, messagesCount, inboxTrackerDb, rlpDelayedMessagePrefix) } func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database, *MessagePruner) { - transactionStreamerDb := rawdb.NewMemoryDatabase() for i := uint64(0); i < uint64(messageCount); i++ { err := transactionStreamerDb.Put(dbKey(messagePrefix, i), []byte{}) Require(t, err) + err = transactionStreamerDb.Put(dbKey(blockHashInputFeedPrefix, i), []byte{}) + Require(t, err) } inboxTrackerDb := rawdb.NewMemoryDatabase() diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 5debe0c41f..0d02dc27dc 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -481,6 +481,10 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde } } + err = deleteStartingAt(s.db, batch, blockHashInputFeedPrefix, uint64ToKey(uint64(count))) + if err != nil { + return err + } err = deleteStartingAt(s.db, batch, messagePrefix, uint64ToKey(uint64(count))) if err != nil { return err From 7ad286d12eeeae3eb8a723c6ce2f5d9f3651fef9 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Fri, 3 May 2024 12:37:42 -0300 Subject: [PATCH 06/30] retrieve block hash and compare it when executing next message --- arbnode/transaction_streamer.go | 46 ++++++++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 0d02dc27dc..9da0b3ea17 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -530,6 +530,30 @@ func (s *TransactionStreamer) GetMessage(seqNum arbutil.MessageIndex) (*arbostyp return &message, nil } +func (s *TransactionStreamer) getMessageWithMetadataAndBlockHash(seqNum arbutil.MessageIndex) (*arbostypes.MessageWithMetadataAndBlockHash, error) { + msg, err := s.GetMessage(seqNum) + if err != nil { + return nil, err + } + + key := dbKey(blockHashInputFeedPrefix, uint64(seqNum)) + data, err := s.db.Get(key) + if err != nil { + return nil, err + } + var blockHash *common.Hash + err = rlp.DecodeBytes(data, &blockHash) + if err != nil { + return nil, err + } + + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + MessageWithMeta: *msg, + BlockHash: blockHash, + } + return &msgWithBlockHash, nil +} + // Note: if changed to acquire the mutex, some internal users may need to be updated to a non-locking version. func (s *TransactionStreamer) GetMessageCount() (arbutil.MessageIndex, error) { posBytes, err := s.db.Get(messageCountKey) @@ -1117,6 +1141,20 @@ func (s *TransactionStreamer) ResultAtCount(count arbutil.MessageIndex) (*execut return s.exec.ResultAtPos(count - 1) } +func (s *TransactionStreamer) checkResult(msgResult *execution.MessageResult, expectedBlockHash *common.Hash) { + if expectedBlockHash == nil { + return + } + if msgResult.BlockHash != *expectedBlockHash { + log.Error( + "block_hash_mismatch", + "expected", expectedBlockHash, + "actual", msgResult.BlockHash, + ) + return + } +} + // exposed for testing // return value: true if should be called again immediately func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution.ExecutionSequencer) bool { @@ -1143,7 +1181,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution if pos >= msgCount { return false } - msg, err := s.GetMessage(pos) + msgAndBlockHash, err := s.getMessageWithMetadataAndBlockHash(pos) if err != nil { log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos) return false @@ -1157,7 +1195,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution } msgForPrefetch = msg } - msgResult, err := s.exec.DigestMessage(pos, msg, msgForPrefetch) + msgResult, err := s.exec.DigestMessage(pos, &msgAndBlockHash.MessageWithMeta, msgForPrefetch) if err != nil { logger := log.Warn if prevMessageCount < msgCount { @@ -1167,8 +1205,10 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution return false } + s.checkResult(msgResult, msgAndBlockHash.BlockHash) + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ - MessageWithMeta: *msg, + MessageWithMeta: msgAndBlockHash.MessageWithMeta, BlockHash: &msgResult.BlockHash, } s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) From 95690fbe1f7ce122ca94e474b78eec1b7c15a802 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Fri, 3 May 2024 15:35:53 -0300 Subject: [PATCH 07/30] use MessageWithMetadata instead of MessageWithMetadataAndBlockHash in places where BlockHash will definitely be nil --- arbnode/inbox_test.go | 20 +++++++-------- arbnode/inbox_tracker.go | 9 +++---- arbnode/seq_coordinator.go | 7 ++--- arbnode/transaction_streamer.go | 34 ++++++++++++++----------- system_tests/contract_tx_test.go | 26 +++++++++---------- system_tests/reorg_resequencing_test.go | 8 +++--- system_tests/seq_coordinator_test.go | 5 +--- 7 files changed, 49 insertions(+), 60 deletions(-) diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index a5d1554cb1..5c879743a4 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -128,7 +128,7 @@ func TestTransactionStreamer(t *testing.T) { } state.balances = newBalances - var messages []arbostypes.MessageWithMetadataAndBlockHash + var messages []arbostypes.MessageWithMetadata // TODO replay a random amount of messages too numMessages := rand.Int() % 5 for j := 0; j < numMessages; j++ { @@ -154,18 +154,16 @@ func TestTransactionStreamer(t *testing.T) { l2Message = append(l2Message, arbmath.U256Bytes(value)...) var requestId common.Hash binary.BigEndian.PutUint64(requestId.Bytes()[:8], uint64(i)) - messages = append(messages, arbostypes.MessageWithMetadataAndBlockHash{ - MessageWithMeta: arbostypes.MessageWithMetadata{ - Message: &arbostypes.L1IncomingMessage{ - Header: &arbostypes.L1IncomingMessageHeader{ - Kind: arbostypes.L1MessageType_L2Message, - Poster: source, - RequestId: &requestId, - }, - L2msg: l2Message, + messages = append(messages, arbostypes.MessageWithMetadata{ + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_L2Message, + Poster: source, + RequestId: &requestId, }, - DelayedMessagesRead: 1, + L2msg: l2Message, }, + DelayedMessagesRead: 1, }) state.balances[source].Sub(state.balances[source], value) if state.balances[dest] == nil { diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 2340df8303..ba1b875ec8 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -652,7 +652,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L pos++ } - var messages []arbostypes.MessageWithMetadataAndBlockHash + var messages []arbostypes.MessageWithMetadata backend := &multiplexerBackend{ batchSeqNum: batches[0].SequenceNumber, batches: batches, @@ -673,10 +673,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L if err != nil { return err } - msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ - MessageWithMeta: *msg, - } - messages = append(messages, msgWithBlockHash) + messages = append(messages, *msg) batchMessageCounts[batchSeqNum] = currentpos currentpos += 1 } @@ -736,7 +733,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L } var latestTimestamp uint64 if len(messages) > 0 { - latestTimestamp = messages[len(messages)-1].MessageWithMeta.Message.Header.Timestamp + latestTimestamp = messages[len(messages)-1].Message.Header.Timestamp } log.Info( "InboxTracker", diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 2fb8c3244b..ecf38ddf42 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -533,7 +533,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { if readUntil > localMsgCount+c.config.MsgPerPoll { readUntil = localMsgCount + c.config.MsgPerPoll } - var messages []arbostypes.MessageWithMetadataAndBlockHash + var messages []arbostypes.MessageWithMetadata msgToRead := localMsgCount var msgReadErr error for msgToRead < readUntil { @@ -592,10 +592,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { DelayedMessagesRead: lastDelayedMsg, } } - msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ - MessageWithMeta: message, - } - messages = append(messages, msgWithBlockHash) + messages = append(messages, message) msgToRead++ } if len(messages) > 0 { diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 9da0b3ea17..7ff565ec16 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -583,7 +583,7 @@ func (s *TransactionStreamer) GetProcessedMessageCount() (arbutil.MessageIndex, return msgCount, nil } -func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash) error { +func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata) error { return s.AddMessagesAndEndBatch(pos, messagesAreConfirmed, messages, nil) } @@ -713,19 +713,16 @@ func (s *TransactionStreamer) AddFakeInitMessage() error { } chainIdBytes := arbmath.U256Bytes(s.chainConfig.ChainID) msg := append(append(chainIdBytes, 0), chainConfigJson...) - return s.AddMessages(0, false, []arbostypes.MessageWithMetadataAndBlockHash{{ - MessageWithMeta: arbostypes.MessageWithMetadata{ - Message: &arbostypes.L1IncomingMessage{ - Header: &arbostypes.L1IncomingMessageHeader{ - Kind: arbostypes.L1MessageType_Initialize, - RequestId: &common.Hash{}, - L1BaseFee: common.Big0, - }, - L2msg: msg, + return s.AddMessages(0, false, []arbostypes.MessageWithMetadata{{ + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_Initialize, + RequestId: &common.Hash{}, + L1BaseFee: common.Big0, }, - DelayedMessagesRead: 1, + L2msg: msg, }, - BlockHash: nil, + DelayedMessagesRead: 1, }}) } @@ -743,12 +740,19 @@ func endBatch(batch ethdb.Batch) error { return batch.Write() } -func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error { +func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error { + messagesWithBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messages)) + for _, message := range messages { + messagesWithBlockHash = append(messagesWithBlockHash, arbostypes.MessageWithMetadataAndBlockHash{ + MessageWithMeta: message, + }) + } + if messagesAreConfirmed { // Trim confirmed messages from l1pricedataCache s.TrimCache(pos + arbutil.MessageIndex(len(messages))) s.reorgMutex.RLock() - dups, _, _, err := s.countDuplicateMessages(pos, messages, nil) + dups, _, _, err := s.countDuplicateMessages(pos, messagesWithBlockHash, nil) s.reorgMutex.RUnlock() if err != nil { return err @@ -765,7 +769,7 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m s.insertionMutex.Lock() defer s.insertionMutex.Unlock() - return s.addMessagesAndEndBatchImpl(pos, messagesAreConfirmed, messages, batch) + return s.addMessagesAndEndBatchImpl(pos, messagesAreConfirmed, messagesWithBlockHash, batch) } func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) (uint64, error) { diff --git a/system_tests/contract_tx_test.go b/system_tests/contract_tx_test.go index c4ae326df1..7d66e516b4 100644 --- a/system_tests/contract_tx_test.go +++ b/system_tests/contract_tx_test.go @@ -69,23 +69,21 @@ func TestContractTxDeploy(t *testing.T) { l2Msg = append(l2Msg, arbmath.U256Bytes(contractTx.Value)...) l2Msg = append(l2Msg, contractTx.Data...) - err = builder.L2.ConsensusNode.TxStreamer.AddMessages(pos, true, []arbostypes.MessageWithMetadataAndBlockHash{ + err = builder.L2.ConsensusNode.TxStreamer.AddMessages(pos, true, []arbostypes.MessageWithMetadata{ { - MessageWithMeta: arbostypes.MessageWithMetadata{ - Message: &arbostypes.L1IncomingMessage{ - Header: &arbostypes.L1IncomingMessageHeader{ - Kind: arbostypes.L1MessageType_L2Message, - Poster: from, - BlockNumber: 0, - Timestamp: 0, - RequestId: &contractTx.RequestId, - L1BaseFee: &big.Int{}, - }, - L2msg: l2Msg, - BatchGasCost: new(uint64), + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_L2Message, + Poster: from, + BlockNumber: 0, + Timestamp: 0, + RequestId: &contractTx.RequestId, + L1BaseFee: &big.Int{}, }, - DelayedMessagesRead: delayedMessagesRead, + L2msg: l2Msg, + BatchGasCost: new(uint64), }, + DelayedMessagesRead: delayedMessagesRead, }, }) Require(t, err) diff --git a/system_tests/reorg_resequencing_test.go b/system_tests/reorg_resequencing_test.go index 28d1b3bd66..b188504acb 100644 --- a/system_tests/reorg_resequencing_test.go +++ b/system_tests/reorg_resequencing_test.go @@ -72,11 +72,9 @@ func TestReorgResequencing(t *testing.T) { }, L2msg: append(builder.L2Info.GetAddress("User4").Bytes(), arbmath.Uint64ToU256Bytes(params.Ether)...), } - err = builder.L2.ConsensusNode.TxStreamer.AddMessages(startMsgCount, true, []arbostypes.MessageWithMetadataAndBlockHash{{ - MessageWithMeta: arbostypes.MessageWithMetadata{ - Message: newMessage, - DelayedMessagesRead: prevMessage.DelayedMessagesRead + 1, - }, + err = builder.L2.ConsensusNode.TxStreamer.AddMessages(startMsgCount, true, []arbostypes.MessageWithMetadata{{ + Message: newMessage, + DelayedMessagesRead: prevMessage.DelayedMessagesRead + 1, }}) Require(t, err) diff --git a/system_tests/seq_coordinator_test.go b/system_tests/seq_coordinator_test.go index 36c7be7251..886a0528c7 100644 --- a/system_tests/seq_coordinator_test.go +++ b/system_tests/seq_coordinator_test.go @@ -91,10 +91,7 @@ func TestRedisSeqCoordinatorPriorities(t *testing.T) { return false } Require(t, err) - emptyMessageWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ - MessageWithMeta: emptyMessage, - } - Require(t, node.TxStreamer.AddMessages(curMsgs, false, []arbostypes.MessageWithMetadataAndBlockHash{emptyMessageWithBlockHash})) + Require(t, node.TxStreamer.AddMessages(curMsgs, false, []arbostypes.MessageWithMetadata{emptyMessage})) return true } From 8671d06f3d69a4a27777ec0370c500601381adac Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Wed, 8 May 2024 12:04:22 -0300 Subject: [PATCH 08/30] fix storing and retrieving block hash --- arbnode/transaction_streamer.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 7ff565ec16..0973ca942e 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -536,15 +536,24 @@ func (s *TransactionStreamer) getMessageWithMetadataAndBlockHash(seqNum arbutil. return nil, err } + // get block hash key := dbKey(blockHashInputFeedPrefix, uint64(seqNum)) - data, err := s.db.Get(key) + hasBlockHash, err := s.db.Has(key) if err != nil { return nil, err } var blockHash *common.Hash - err = rlp.DecodeBytes(data, &blockHash) - if err != nil { - return nil, err + if hasBlockHash { + data, err := s.db.Get(key) + if err != nil { + return nil, err + } + var storedBlockHash common.Hash + err = rlp.DecodeBytes(data, &storedBlockHash) + if err != nil { + return nil, err + } + blockHash = &storedBlockHash } msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ @@ -1087,8 +1096,13 @@ func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbosty } // write block hash + if msg.BlockHash == nil { + // don't write nil block hash to avoid issues with rlp decoder that + // doesn't produce nil values by default + return nil + } key = dbKey(blockHashInputFeedPrefix, uint64(pos)) - msgBytes, err = rlp.EncodeToBytes(msg.BlockHash) + msgBytes, err = rlp.EncodeToBytes(*msg.BlockHash) if err != nil { return err } From 0fc92af157ceb77d86c0f646d36dd06be3f3105d Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Wed, 8 May 2024 12:04:44 -0300 Subject: [PATCH 09/30] check block_hash_mismatch in sequencer system tests --- system_tests/seqfeed_test.go | 85 +++++++++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 1 deletion(-) diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index 749a91e3b1..f9cca03616 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -11,10 +11,16 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" "github.com/offchainlabs/nitro/arbnode" + "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/broadcastclient" + "github.com/offchainlabs/nitro/broadcaster/backlog" + "github.com/offchainlabs/nitro/broadcaster/message" "github.com/offchainlabs/nitro/relay" "github.com/offchainlabs/nitro/util/signature" + "github.com/offchainlabs/nitro/util/testhelpers" "github.com/offchainlabs/nitro/wsbroadcastserver" ) @@ -38,7 +44,8 @@ func newBroadcastClientConfigTest(port int) *broadcastclient.Config { } func TestSequencerFeed(t *testing.T) { - t.Parallel() + logHandler := testhelpers.InitTestLog(t, log.LvlTrace) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -73,6 +80,10 @@ func TestSequencerFeed(t *testing.T) { if l2balance.Cmp(big.NewInt(1e12)) != 0 { t.Fatal("Unexpected balance:", l2balance) } + + if logHandler.WasLogged("block_hash_mismatch") { + t.Fatal("block_hash_mismatch was logged unexpectedly") + } } func TestRelayedSequencerFeed(t *testing.T) { @@ -250,3 +261,75 @@ func TestLyingSequencer(t *testing.T) { func TestLyingSequencerLocalDAS(t *testing.T) { testLyingSequencer(t, "files") } + +func TestBlockHashFeedMismatch(t *testing.T) { + logHandler := testhelpers.InitTestLog(t, log.LvlTrace) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + backlogConfiFetcher := func() *backlog.Config { + return &backlog.DefaultTestConfig + } + bklg := backlog.NewBacklog(backlogConfiFetcher) + + feedErrChan := make(chan error) + wsBroadcastServer := wsbroadcastserver.NewWSBroadcastServer( + newBroadcasterConfigTest, + bklg, + 412346, + feedErrChan, + ) + err := wsBroadcastServer.Initialize() + if err != nil { + t.Fatal("error initializing wsBroadcastServer:", err) + } + err = wsBroadcastServer.Start(ctx) + if err != nil { + t.Fatal("error starting wsBroadcastServer:", err) + } + defer wsBroadcastServer.StopAndWait() + + port := wsBroadcastServer.ListenerAddr().(*net.TCPAddr).Port + + builder := NewNodeBuilder(ctx).DefaultConfig(t, false) + builder.nodeConfig.Feed.Input = *newBroadcastClientConfigTest(port) + cleanup := builder.Build(t) + defer cleanup() + + poster := common.HexToAddress("0xa4b000000000000000000073657175656e636572") + blockHash := common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + l2msg := []byte{4, 2, 248, 111, 131, 6, 74, 186, 128, 128, 132, 11, 235, 194, 0, 131, 122, 18, 0, 148, 12, 112, 159, 52, 15, 11, 178, 227, 97, 34, 158, 52, 91, 126, 38, 153, 157, 9, 105, 171, 133, 232, 212, 165, 16, 0, 128, 192, 1, 160, 75, 109, 200, 183, 223, 114, 85, 128, 133, 94, 26, 103, 145, 247, 47, 0, 114, 132, 133, 234, 222, 235, 102, 45, 2, 109, 83, 65, 210, 142, 242, 209, 160, 96, 90, 108, 188, 197, 195, 43, 222, 103, 155, 153, 81, 119, 74, 177, 103, 110, 134, 94, 221, 72, 236, 20, 86, 94, 226, 94, 5, 206, 196, 122, 119} + broadcastMessage := message.BroadcastMessage{ + Version: 1, + Messages: []*message.BroadcastFeedMessage{ + { + // SequenceNumber: 1, + SequenceNumber: 2, + Message: arbostypes.MessageWithMetadata{ + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_L2Message, + Poster: poster, + BlockNumber: 29, + Timestamp: 1715136502, + RequestId: nil, + L1BaseFee: big.NewInt(0), + }, + L2msg: l2msg, + }, + DelayedMessagesRead: 1, + }, + BlockHash: &blockHash, + Signature: nil, + }, + }, + } + wsBroadcastServer.Broadcast(&broadcastMessage) + + time.Sleep(time.Second * 2) + + if !logHandler.WasLogged("block_hash_mismatch") { + t.Fatal("Failed to log block_hash_mismatch") + } +} From 202efe1c8b7b8dc19229950d4317e8ca8ba4a1f0 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Wed, 8 May 2024 14:30:58 -0300 Subject: [PATCH 10/30] stores block hash even it is nil, which enables to iterate through range of keys to properly prune a range of block hashes from the db --- arbnode/transaction_streamer.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 0973ca942e..9e6ae7d92e 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -140,6 +140,12 @@ type L1PriceData struct { currentEstimateOfL1GasPrice uint64 } +// Represents a block's hash in the database. +// Necessary because RLP decoder doesn't produce nil values by default. +type blockHashDBValue struct { + BlockHash *common.Hash `rlp:"nil"` +} + func (s *TransactionStreamer) CurrentEstimateOfL1GasPrice() uint64 { s.cachedL1PriceDataMutex.Lock() defer s.cachedL1PriceDataMutex.Unlock() @@ -548,12 +554,12 @@ func (s *TransactionStreamer) getMessageWithMetadataAndBlockHash(seqNum arbutil. if err != nil { return nil, err } - var storedBlockHash common.Hash - err = rlp.DecodeBytes(data, &storedBlockHash) + var blockHashDBVal blockHashDBValue + err = rlp.DecodeBytes(data, &blockHashDBVal) if err != nil { return nil, err } - blockHash = &storedBlockHash + blockHash = blockHashDBVal.BlockHash } msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ @@ -1096,13 +1102,11 @@ func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbosty } // write block hash - if msg.BlockHash == nil { - // don't write nil block hash to avoid issues with rlp decoder that - // doesn't produce nil values by default - return nil + blockHashDBVal := blockHashDBValue{ + BlockHash: msg.BlockHash, } key = dbKey(blockHashInputFeedPrefix, uint64(pos)) - msgBytes, err = rlp.EncodeToBytes(*msg.BlockHash) + msgBytes, err = rlp.EncodeToBytes(blockHashDBVal) if err != nil { return err } From 2a56769f37d138185a7c92dfc9c0802128761037 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Wed, 8 May 2024 15:04:59 -0300 Subject: [PATCH 11/30] add comment on why verifying if block hash key exists in DB before retrieving it --- arbnode/transaction_streamer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 9e6ae7d92e..dc375d642e 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -542,7 +542,9 @@ func (s *TransactionStreamer) getMessageWithMetadataAndBlockHash(seqNum arbutil. return nil, err } - // get block hash + // Get block hash. + // First check if key exists in database so this procedure is backwards compatible + // with databases' snapshots that don't have block hashes stored. key := dbKey(blockHashInputFeedPrefix, uint64(seqNum)) hasBlockHash, err := s.db.Has(key) if err != nil { From b2bb3da34c58597f0045bf077dfdaa7a00571f9e Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Wed, 8 May 2024 15:41:35 -0300 Subject: [PATCH 12/30] rm unwanted comment --- system_tests/seqfeed_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index f9cca03616..e56514a6d5 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -304,7 +304,6 @@ func TestBlockHashFeedMismatch(t *testing.T) { Version: 1, Messages: []*message.BroadcastFeedMessage{ { - // SequenceNumber: 1, SequenceNumber: 2, Message: arbostypes.MessageWithMetadata{ Message: &arbostypes.L1IncomingMessage{ From 881e2fd34ec60bd2c922d9871c1080c26c7de3f1 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Thu, 9 May 2024 16:23:56 -0300 Subject: [PATCH 13/30] Test that the output feed of a node that isn't the sequencer is properly processed --- system_tests/seq_coordinator_test.go | 31 ++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/system_tests/seq_coordinator_test.go b/system_tests/seq_coordinator_test.go index 886a0528c7..a069a2d5a1 100644 --- a/system_tests/seq_coordinator_test.go +++ b/system_tests/seq_coordinator_test.go @@ -8,12 +8,14 @@ import ( "errors" "fmt" "math/big" + "net" "testing" "time" "github.com/go-redis/redis/v8" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbos/arbostypes" @@ -21,6 +23,7 @@ import ( "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/util/redisutil" + "github.com/offchainlabs/nitro/util/testhelpers" ) func initRedisForTest(t *testing.T, ctx context.Context, redisUrl string, nodeNames []string) { @@ -270,6 +273,8 @@ func TestRedisSeqCoordinatorPriorities(t *testing.T) { } func testCoordinatorMessageSync(t *testing.T, successCase bool) { + logHandler := testhelpers.InitTestLog(t, log.LvlTrace) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -304,16 +309,25 @@ func testCoordinatorMessageSync(t *testing.T, successCase bool) { nodeConfigDup := *builder.nodeConfig builder.nodeConfig = &nodeConfigDup - + builder.nodeConfig.Feed.Output = *newBroadcasterConfigTest() builder.nodeConfig.SeqCoordinator.MyUrl = nodeNames[1] if !successCase { builder.nodeConfig.SeqCoordinator.Signer.ECDSA.AcceptSequencer = false builder.nodeConfig.SeqCoordinator.Signer.ECDSA.AllowedAddresses = []string{builder.L2Info.GetAddress("User2").Hex()} } - testClientB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{nodeConfig: builder.nodeConfig}) defer cleanupB() + // Build nodeBOutputFeedReader. + // nodeB doesn't sequence transactions, but adds messages related to them to its output feed. + // nodeBOutputFeedReader reads those messages from this feed and processes them. + // nodeBOutputFeedReader doesn't read messages from L1 since none of the nodes posts to L1. + nodeBPort := testClientB.ConsensusNode.BroadcastServer.ListenerAddr().(*net.TCPAddr).Port + nodeConfigNodeBOutputFeedReader := arbnode.ConfigDefaultL1NonSequencerTest() + nodeConfigNodeBOutputFeedReader.Feed.Input = *newBroadcastClientConfigTest(nodeBPort) + testClientNodeBOutputFeedReader, cleanupNodeBOutputFeedReader := builder.Build2ndNode(t, &SecondNodeParams{nodeConfig: nodeConfigNodeBOutputFeedReader}) + defer cleanupNodeBOutputFeedReader() + tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, big.NewInt(1e12), nil) err = builder.L2.Client.SendTransaction(ctx, tx) @@ -330,6 +344,19 @@ func testCoordinatorMessageSync(t *testing.T, successCase bool) { if l2balance.Cmp(big.NewInt(1e12)) != 0 { t.Fatal("Unexpected balance:", l2balance) } + + // check that nodeBOutputFeedReader also processed the transaction + _, err = WaitForTx(ctx, testClientNodeBOutputFeedReader.Client, tx.Hash(), time.Second*5) + Require(t, err) + l2balance, err = testClientNodeBOutputFeedReader.Client.BalanceAt(ctx, builder.L2Info.GetAddress("User2"), nil) + Require(t, err) + if l2balance.Cmp(big.NewInt(1e12)) != 0 { + t.Fatal("Unexpected balance:", l2balance) + } + + if logHandler.WasLogged("block_hash_mismatch") { + t.Fatal("block_hash_mismatch was logged unexpectedly") + } } else { _, err = WaitForTx(ctx, testClientB.Client, tx.Hash(), time.Second) if err == nil { From 0df0c201ce107e666e15f744f6c3d604182c3fb8 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Thu, 9 May 2024 19:16:48 -0300 Subject: [PATCH 14/30] improve get block hash from db --- arbnode/transaction_streamer.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index dc375d642e..d9c7fc2163 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -520,6 +520,10 @@ func dbKey(prefix []byte, pos uint64) []byte { return key } +func isErrNotFound(err error) bool { + return errors.Is(err, leveldb.ErrNotFound) || errors.Is(err, pebble.ErrNotFound) +} + // Note: if changed to acquire the mutex, some internal users may need to be updated to a non-locking version. func (s *TransactionStreamer) GetMessage(seqNum arbutil.MessageIndex) (*arbostypes.MessageWithMetadata, error) { key := dbKey(messagePrefix, uint64(seqNum)) @@ -543,25 +547,20 @@ func (s *TransactionStreamer) getMessageWithMetadataAndBlockHash(seqNum arbutil. } // Get block hash. - // First check if key exists in database so this procedure is backwards compatible - // with databases' snapshots that don't have block hashes stored. + // To keep it backwards compatible it is possible that a message related + // to a sequence number exists in the database but the block hash doesn't. key := dbKey(blockHashInputFeedPrefix, uint64(seqNum)) - hasBlockHash, err := s.db.Has(key) - if err != nil { - return nil, err - } var blockHash *common.Hash - if hasBlockHash { - data, err := s.db.Get(key) - if err != nil { - return nil, err - } + data, err := s.db.Get(key) + if err == nil { var blockHashDBVal blockHashDBValue err = rlp.DecodeBytes(data, &blockHashDBVal) if err != nil { return nil, err } blockHash = blockHashDBVal.BlockHash + } else if !isErrNotFound(err) { + return nil, err } msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ @@ -706,7 +705,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe if broadcastStartPos > 0 { _, err := s.GetMessage(broadcastStartPos - 1) if err != nil { - if !errors.Is(err, leveldb.ErrNotFound) && !errors.Is(err, pebble.ErrNotFound) { + if !isErrNotFound(err) { return err } // Message before current message doesn't exist in database, so don't add current messages yet From 61c7d376bcbf88bd8694d8383a4281cacecc5400 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Thu, 9 May 2024 20:24:10 -0300 Subject: [PATCH 15/30] improve TestBlockHashFeedMismatch --- system_tests/seqfeed_test.go | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index e56514a6d5..2aa64a801d 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -273,12 +273,11 @@ func TestBlockHashFeedMismatch(t *testing.T) { } bklg := backlog.NewBacklog(backlogConfiFetcher) - feedErrChan := make(chan error) wsBroadcastServer := wsbroadcastserver.NewWSBroadcastServer( newBroadcasterConfigTest, bklg, 412346, - feedErrChan, + nil, ) err := wsBroadcastServer.Initialize() if err != nil { @@ -292,11 +291,16 @@ func TestBlockHashFeedMismatch(t *testing.T) { port := wsBroadcastServer.ListenerAddr().(*net.TCPAddr).Port - builder := NewNodeBuilder(ctx).DefaultConfig(t, false) + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) builder.nodeConfig.Feed.Input = *newBroadcastClientConfigTest(port) cleanup := builder.Build(t) defer cleanup() + testClient := builder.L2 + // related to: + // - builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, big.NewInt(1e12), nil) + userAccount := "User2" + txHash := common.HexToHash("0x633f62b463cc0e52d842406995fb590654db40aace77bfca863ba0e8d2290f97") poster := common.HexToAddress("0xa4b000000000000000000073657175656e636572") blockHash := common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") l2msg := []byte{4, 2, 248, 111, 131, 6, 74, 186, 128, 128, 132, 11, 235, 194, 0, 131, 122, 18, 0, 148, 12, 112, 159, 52, 15, 11, 178, 227, 97, 34, 158, 52, 91, 126, 38, 153, 157, 9, 105, 171, 133, 232, 212, 165, 16, 0, 128, 192, 1, 160, 75, 109, 200, 183, 223, 114, 85, 128, 133, 94, 26, 103, 145, 247, 47, 0, 114, 132, 133, 234, 222, 235, 102, 45, 2, 109, 83, 65, 210, 142, 242, 209, 160, 96, 90, 108, 188, 197, 195, 43, 222, 103, 155, 153, 81, 119, 74, 177, 103, 110, 134, 94, 221, 72, 236, 20, 86, 94, 226, 94, 5, 206, 196, 122, 119} @@ -304,16 +308,16 @@ func TestBlockHashFeedMismatch(t *testing.T) { Version: 1, Messages: []*message.BroadcastFeedMessage{ { - SequenceNumber: 2, + SequenceNumber: 1, Message: arbostypes.MessageWithMetadata{ Message: &arbostypes.L1IncomingMessage{ Header: &arbostypes.L1IncomingMessageHeader{ Kind: arbostypes.L1MessageType_L2Message, Poster: poster, BlockNumber: 29, - Timestamp: 1715136502, + Timestamp: 1715295980, RequestId: nil, - L1BaseFee: big.NewInt(0), + L1BaseFee: nil, }, L2msg: l2msg, }, @@ -326,8 +330,21 @@ func TestBlockHashFeedMismatch(t *testing.T) { } wsBroadcastServer.Broadcast(&broadcastMessage) - time.Sleep(time.Second * 2) + // By now, even though block hash mismatch, the transaction should still be processed + builder.L2Info.GenerateAccount(userAccount) + _, err = WaitForTx(ctx, testClient.Client, txHash, time.Second*15) + if err != nil { + t.Fatal("error waiting for tx:", err) + } + l2balance, err := testClient.Client.BalanceAt(ctx, builder.L2Info.GetAddress(userAccount), nil) + if err != nil { + t.Fatal("error getting balance:", err) + } + if l2balance.Cmp(big.NewInt(1e12)) != 0 { + t.Fatal("Unexpected balance:", l2balance) + } + // check that block hash mismatch if !logHandler.WasLogged("block_hash_mismatch") { t.Fatal("Failed to log block_hash_mismatch") } From 70818c5f559a0acfbc87f6d08920090df351dff9 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Thu, 9 May 2024 20:31:49 -0300 Subject: [PATCH 16/30] add TestBlockHashFeedNil --- system_tests/seqfeed_test.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index 2aa64a801d..946194f17d 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -262,7 +262,7 @@ func TestLyingSequencerLocalDAS(t *testing.T) { testLyingSequencer(t, "files") } -func TestBlockHashFeedMismatch(t *testing.T) { +func testBlockHashComparison(t *testing.T, blockHash *common.Hash, mustMismatch bool) { logHandler := testhelpers.InitTestLog(t, log.LvlTrace) ctx, cancel := context.WithCancel(context.Background()) @@ -302,7 +302,6 @@ func TestBlockHashFeedMismatch(t *testing.T) { userAccount := "User2" txHash := common.HexToHash("0x633f62b463cc0e52d842406995fb590654db40aace77bfca863ba0e8d2290f97") poster := common.HexToAddress("0xa4b000000000000000000073657175656e636572") - blockHash := common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") l2msg := []byte{4, 2, 248, 111, 131, 6, 74, 186, 128, 128, 132, 11, 235, 194, 0, 131, 122, 18, 0, 148, 12, 112, 159, 52, 15, 11, 178, 227, 97, 34, 158, 52, 91, 126, 38, 153, 157, 9, 105, 171, 133, 232, 212, 165, 16, 0, 128, 192, 1, 160, 75, 109, 200, 183, 223, 114, 85, 128, 133, 94, 26, 103, 145, 247, 47, 0, 114, 132, 133, 234, 222, 235, 102, 45, 2, 109, 83, 65, 210, 142, 242, 209, 160, 96, 90, 108, 188, 197, 195, 43, 222, 103, 155, 153, 81, 119, 74, 177, 103, 110, 134, 94, 221, 72, 236, 20, 86, 94, 226, 94, 5, 206, 196, 122, 119} broadcastMessage := message.BroadcastMessage{ Version: 1, @@ -323,7 +322,7 @@ func TestBlockHashFeedMismatch(t *testing.T) { }, DelayedMessagesRead: 1, }, - BlockHash: &blockHash, + BlockHash: blockHash, Signature: nil, }, }, @@ -344,8 +343,19 @@ func TestBlockHashFeedMismatch(t *testing.T) { t.Fatal("Unexpected balance:", l2balance) } - // check that block hash mismatch - if !logHandler.WasLogged("block_hash_mismatch") { + mismatched := logHandler.WasLogged("block_hash_mismatch") + if mustMismatch && !mismatched { t.Fatal("Failed to log block_hash_mismatch") + } else if !mustMismatch && mismatched { + t.Fatal("block_hash_mismatch was logged unexpectedly") } } + +func TestBlockHashFeedMismatch(t *testing.T) { + blockHash := common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + testBlockHashComparison(t, &blockHash, true) +} + +func TestBlockHashFeedNil(t *testing.T) { + testBlockHashComparison(t, nil, false) +} From 45180a7e8e4b43592edb75b43bbd32d1e3cc6148 Mon Sep 17 00:00:00 2001 From: Jeremy Date: Tue, 14 May 2024 14:28:17 +0800 Subject: [PATCH 17/30] Update testconstants.go --- arbos/programs/testconstants.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/arbos/programs/testconstants.go b/arbos/programs/testconstants.go index 215b5fb8a7..1ab0e6e93b 100644 --- a/arbos/programs/testconstants.go +++ b/arbos/programs/testconstants.go @@ -1,6 +1,9 @@ // Copyright 2024, Offchain Labs, Inc. // For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE +//go:build !wasm +// +build !wasm + package programs // This file exists because cgo isn't allowed in tests From a8254afca62deb6bdcda7533df434093df438734 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Wed, 15 May 2024 19:19:25 -0300 Subject: [PATCH 18/30] avoid hardicoding adresses and l2msg in tests --- execution/gethexec/executionengine.go | 4 +-- system_tests/seqfeed_test.go | 41 ++++++++++++++------------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index b31209b882..96dca6c63e 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -197,7 +197,7 @@ func (s *ExecutionEngine) NextDelayedMessageNumber() (uint64, error) { return currentHeader.Nonce.Uint64(), nil } -func messageFromTxes(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, txErrors []error) (*arbostypes.L1IncomingMessage, error) { +func MessageFromTxes(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, txErrors []error) (*arbostypes.L1IncomingMessage, error) { var l2Message []byte if len(txes) == 1 && txErrors[0] == nil { txBytes, err := txes[0].MarshalBinary() @@ -368,7 +368,7 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes. return nil, nil } - msg, err := messageFromTxes(header, txes, hooks.TxErrors) + msg, err := MessageFromTxes(header, txes, hooks.TxErrors) if err != nil { return nil, err } diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index 946194f17d..ed0398c40e 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -12,12 +12,15 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbos/arbostypes" + "github.com/offchainlabs/nitro/arbos/l1pricing" "github.com/offchainlabs/nitro/broadcastclient" "github.com/offchainlabs/nitro/broadcaster/backlog" "github.com/offchainlabs/nitro/broadcaster/message" + "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/relay" "github.com/offchainlabs/nitro/util/signature" "github.com/offchainlabs/nitro/util/testhelpers" @@ -297,41 +300,41 @@ func testBlockHashComparison(t *testing.T, blockHash *common.Hash, mustMismatch defer cleanup() testClient := builder.L2 - // related to: - // - builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, big.NewInt(1e12), nil) userAccount := "User2" - txHash := common.HexToHash("0x633f62b463cc0e52d842406995fb590654db40aace77bfca863ba0e8d2290f97") - poster := common.HexToAddress("0xa4b000000000000000000073657175656e636572") - l2msg := []byte{4, 2, 248, 111, 131, 6, 74, 186, 128, 128, 132, 11, 235, 194, 0, 131, 122, 18, 0, 148, 12, 112, 159, 52, 15, 11, 178, 227, 97, 34, 158, 52, 91, 126, 38, 153, 157, 9, 105, 171, 133, 232, 212, 165, 16, 0, 128, 192, 1, 160, 75, 109, 200, 183, 223, 114, 85, 128, 133, 94, 26, 103, 145, 247, 47, 0, 114, 132, 133, 234, 222, 235, 102, 45, 2, 109, 83, 65, 210, 142, 242, 209, 160, 96, 90, 108, 188, 197, 195, 43, 222, 103, 155, 153, 81, 119, 74, 177, 103, 110, 134, 94, 221, 72, 236, 20, 86, 94, 226, 94, 5, 206, 196, 122, 119} + builder.L2Info.GenerateAccount(userAccount) + tx := builder.L2Info.PrepareTx("Owner", userAccount, builder.L2Info.TransferGas, big.NewInt(1e12), nil) + l1IncomingMsgHeader := arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_L2Message, + Poster: l1pricing.BatchPosterAddress, + BlockNumber: 29, + Timestamp: 1715295980, + RequestId: nil, + L1BaseFee: nil, + } + l1IncomingMsg, err := gethexec.MessageFromTxes( + &l1IncomingMsgHeader, + types.Transactions{tx}, + []error{nil}, + ) + Require(t, err) + broadcastMessage := message.BroadcastMessage{ Version: 1, Messages: []*message.BroadcastFeedMessage{ { SequenceNumber: 1, Message: arbostypes.MessageWithMetadata{ - Message: &arbostypes.L1IncomingMessage{ - Header: &arbostypes.L1IncomingMessageHeader{ - Kind: arbostypes.L1MessageType_L2Message, - Poster: poster, - BlockNumber: 29, - Timestamp: 1715295980, - RequestId: nil, - L1BaseFee: nil, - }, - L2msg: l2msg, - }, + Message: l1IncomingMsg, DelayedMessagesRead: 1, }, BlockHash: blockHash, - Signature: nil, }, }, } wsBroadcastServer.Broadcast(&broadcastMessage) // By now, even though block hash mismatch, the transaction should still be processed - builder.L2Info.GenerateAccount(userAccount) - _, err = WaitForTx(ctx, testClient.Client, txHash, time.Second*15) + _, err = WaitForTx(ctx, testClient.Client, tx.Hash(), time.Second*15) if err != nil { t.Fatal("error waiting for tx:", err) } From b03d7b34eac6f0c4456914ee6b2fbe6ef7ba03a4 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Wed, 15 May 2024 19:39:04 -0300 Subject: [PATCH 19/30] improve log message when computed block hash doesn't match hash provided through input feed --- arbnode/transaction_streamer.go | 10 +++++++--- system_tests/seq_coordinator_test.go | 4 ++-- system_tests/seqfeed_test.go | 6 +++--- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index d9c7fc2163..b79b1aa963 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -146,6 +146,10 @@ type blockHashDBValue struct { BlockHash *common.Hash `rlp:"nil"` } +const ( + BlockHashMismatchLogMsg = "BlockHash from feed doesn't match locally computed hash. Check feed source." +) + func (s *TransactionStreamer) CurrentEstimateOfL1GasPrice() uint64 { s.cachedL1PriceDataMutex.Lock() defer s.cachedL1PriceDataMutex.Unlock() @@ -547,8 +551,8 @@ func (s *TransactionStreamer) getMessageWithMetadataAndBlockHash(seqNum arbutil. } // Get block hash. - // To keep it backwards compatible it is possible that a message related - // to a sequence number exists in the database but the block hash doesn't. + // To keep it backwards compatible, since it is possible that a message related + // to a sequence number exists in the database, but the block hash doesn't. key := dbKey(blockHashInputFeedPrefix, uint64(seqNum)) var blockHash *common.Hash data, err := s.db.Get(key) @@ -1170,7 +1174,7 @@ func (s *TransactionStreamer) checkResult(msgResult *execution.MessageResult, ex } if msgResult.BlockHash != *expectedBlockHash { log.Error( - "block_hash_mismatch", + BlockHashMismatchLogMsg, "expected", expectedBlockHash, "actual", msgResult.BlockHash, ) diff --git a/system_tests/seq_coordinator_test.go b/system_tests/seq_coordinator_test.go index a069a2d5a1..43d55f40c9 100644 --- a/system_tests/seq_coordinator_test.go +++ b/system_tests/seq_coordinator_test.go @@ -354,8 +354,8 @@ func testCoordinatorMessageSync(t *testing.T, successCase bool) { t.Fatal("Unexpected balance:", l2balance) } - if logHandler.WasLogged("block_hash_mismatch") { - t.Fatal("block_hash_mismatch was logged unexpectedly") + if logHandler.WasLogged(arbnode.BlockHashMismatchLogMsg) { + t.Fatal("BlockHashMismatchLogMsg was logged unexpectedly") } } else { _, err = WaitForTx(ctx, testClientB.Client, tx.Hash(), time.Second) diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index ed0398c40e..589a48d3af 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -346,11 +346,11 @@ func testBlockHashComparison(t *testing.T, blockHash *common.Hash, mustMismatch t.Fatal("Unexpected balance:", l2balance) } - mismatched := logHandler.WasLogged("block_hash_mismatch") + mismatched := logHandler.WasLogged(arbnode.BlockHashMismatchLogMsg) if mustMismatch && !mismatched { - t.Fatal("Failed to log block_hash_mismatch") + t.Fatal("Failed to log BlockHashMismatchLogMsg") } else if !mustMismatch && mismatched { - t.Fatal("block_hash_mismatch was logged unexpectedly") + t.Fatal("BlockHashMismatchLogMsg was logged unexpectedly") } } From 4f72ebb168ad60d8868c0de1d82898ff8f33f9a7 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Wed, 15 May 2024 23:45:23 -0500 Subject: [PATCH 20/30] Fix signed saturating math functions --- util/arbmath/math.go | 44 ++++++++----- util/arbmath/math_fuzz_test.go | 112 +++++++++++++++++++++++++++++++++ util/arbmath/math_test.go | 109 ++++++++++++++++++++++++++++++++ 3 files changed, 250 insertions(+), 15 deletions(-) create mode 100644 util/arbmath/math_fuzz_test.go diff --git a/util/arbmath/math.go b/util/arbmath/math.go index 1c11c6ad58..8f93caa87d 100644 --- a/util/arbmath/math.go +++ b/util/arbmath/math.go @@ -74,14 +74,6 @@ func MaxInt[T Number](values ...T) T { return max } -// AbsValue the absolute value of a number -func AbsValue[T Number](value T) T { - if value < 0 { - return -value // never happens for unsigned types - } - return value -} - // Checks if two ints are sufficiently close to one another func Within[T Unsigned](a, b, bound T) bool { min := MinInt(a, b) @@ -267,14 +259,32 @@ func BigFloatMulByUint(multiplicand *big.Float, multiplier uint64) *big.Float { return new(big.Float).Mul(multiplicand, UintToBigFloat(multiplier)) } +func MaxIntValue[T Integer]() T { + allBits := ^T(0) + if allBits < 0 { + // This is a signed integer + return T((uint64(1) << (8*unsafe.Sizeof(allBits) - 1)) - 1) + } + return allBits +} + +func MinIntValue[T Integer]() T { + allBits := ^T(0) + if allBits < 0 { + // This is a signed integer + return T(uint64(1) << ((8 * unsafe.Sizeof(allBits)) - 1)) + } + return 0 +} + // SaturatingAdd add two integers without overflow func SaturatingAdd[T Signed](a, b T) T { sum := a + b if b > 0 && sum < a { - sum = ^T(0) >> 1 + sum = MaxIntValue[T]() } if b < 0 && sum > a { - sum = (^T(0) >> 1) + 1 + sum = MinIntValue[T]() } return sum } @@ -290,7 +300,11 @@ func SaturatingUAdd[T Unsigned](a, b T) T { // SaturatingSub subtract an int64 from another without overflow func SaturatingSub(minuend, subtrahend int64) int64 { - return SaturatingAdd(minuend, -subtrahend) + if subtrahend == math.MinInt64 { + // The absolute value of MinInt64 is one greater than MaxInt64 + return SaturatingAdd(SaturatingAdd(minuend, math.MaxInt64), 1) + } + return SaturatingAdd(minuend, SaturatingNeg(subtrahend)) } // SaturatingUSub subtract an integer from another without underflow @@ -315,9 +329,9 @@ func SaturatingMul[T Signed](a, b T) T { product := a * b if b != 0 && product/b != a { if (a > 0 && b > 0) || (a < 0 && b < 0) { - product = ^T(0) >> 1 + product = MaxIntValue[T]() } else { - product = (^T(0) >> 1) + 1 + product = MinIntValue[T]() } } return product @@ -367,8 +381,8 @@ func SaturatingCastToUint(value *big.Int) uint64 { // Negates an int without underflow func SaturatingNeg[T Signed](value T) T { - if value == ^T(0) { - return (^T(0)) >> 1 + if value < 0 && value == MinIntValue[T]() { + return MaxIntValue[T]() } return -value } diff --git a/util/arbmath/math_fuzz_test.go b/util/arbmath/math_fuzz_test.go new file mode 100644 index 0000000000..6e27f2b70a --- /dev/null +++ b/util/arbmath/math_fuzz_test.go @@ -0,0 +1,112 @@ +// Copyright 2024, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package arbmath + +import ( + "math/big" + "testing" +) + +func toBig[T Signed](a T) *big.Int { + return big.NewInt(int64(a)) +} + +func saturatingBigToInt[T Signed](a *big.Int) T { + // MinIntValue and MaxIntValue are already separately tested + if a.Cmp(toBig(MaxIntValue[T]())) > 0 { + return MaxIntValue[T]() + } + if a.Cmp(toBig(MinIntValue[T]())) < 0 { + return MinIntValue[T]() + } + return T(a.Int64()) +} + +func fuzzSaturatingAdd[T Signed](f *testing.F) { + f.Fuzz(func(t *testing.T, a, b T) { + got := SaturatingAdd(a, b) + expected := saturatingBigToInt[T](new(big.Int).Add(toBig(a), toBig(b))) + if got != expected { + t.Errorf("SaturatingAdd(%v, %v) = %v, expected %v", a, b, got, expected) + } + }) +} + +func fuzzSaturatingMul[T Signed](f *testing.F) { + f.Fuzz(func(t *testing.T, a, b T) { + got := SaturatingMul(a, b) + expected := saturatingBigToInt[T](new(big.Int).Mul(toBig(a), toBig(b))) + if got != expected { + t.Errorf("SaturatingMul(%v, %v) = %v, expected %v", a, b, got, expected) + } + }) +} + +func fuzzSaturatingNeg[T Signed](f *testing.F) { + f.Fuzz(func(t *testing.T, a T) { + got := SaturatingNeg(a) + expected := saturatingBigToInt[T](new(big.Int).Neg(toBig(a))) + if got != expected { + t.Errorf("SaturatingNeg(%v) = %v, expected %v", a, got, expected) + } + }) +} + +func FuzzSaturatingAddInt8(f *testing.F) { + fuzzSaturatingAdd[int8](f) +} + +func FuzzSaturatingAddInt16(f *testing.F) { + fuzzSaturatingAdd[int16](f) +} + +func FuzzSaturatingAddInt32(f *testing.F) { + fuzzSaturatingAdd[int32](f) +} + +func FuzzSaturatingAddInt64(f *testing.F) { + fuzzSaturatingAdd[int64](f) +} + +func FuzzSaturatingSub(f *testing.F) { + f.Fuzz(func(t *testing.T, a, b int64) { + got := SaturatingSub(a, b) + expected := saturatingBigToInt[int64](new(big.Int).Sub(toBig(a), toBig(b))) + if got != expected { + t.Errorf("SaturatingSub(%v, %v) = %v, expected %v", a, b, got, expected) + } + }) +} + +func FuzzSaturatingMulInt8(f *testing.F) { + fuzzSaturatingMul[int8](f) +} + +func FuzzSaturatingMulInt16(f *testing.F) { + fuzzSaturatingMul[int16](f) +} + +func FuzzSaturatingMulInt32(f *testing.F) { + fuzzSaturatingMul[int32](f) +} + +func FuzzSaturatingMulInt64(f *testing.F) { + fuzzSaturatingMul[int64](f) +} + +func FuzzSaturatingNegInt8(f *testing.F) { + fuzzSaturatingNeg[int8](f) +} + +func FuzzSaturatingNegInt16(f *testing.F) { + fuzzSaturatingNeg[int16](f) +} + +func FuzzSaturatingNegInt32(f *testing.F) { + fuzzSaturatingNeg[int32](f) +} + +func FuzzSaturatingNegInt64(f *testing.F) { + fuzzSaturatingNeg[int64](f) +} diff --git a/util/arbmath/math_test.go b/util/arbmath/math_test.go index 2e2f14795a..194d6d7c86 100644 --- a/util/arbmath/math_test.go +++ b/util/arbmath/math_test.go @@ -5,6 +5,7 @@ package arbmath import ( "bytes" + "fmt" "math" "math/rand" "testing" @@ -120,6 +121,114 @@ func TestSlices(t *testing.T) { assert_eq(SliceWithRunoff(data, 7, 8), []uint8{}) } +func testMinMaxValues[T Integer](t *testing.T, min T, max T) { + gotMin := MinIntValue[T]() + if gotMin != min { + Fail(t, "expected min", min, "but got", gotMin) + } + gotMax := MaxIntValue[T]() + if gotMax != max { + Fail(t, "expected max", max, "but got", gotMax) + } +} + +func TestMinMaxValues(t *testing.T) { + testMinMaxValues[uint8](t, 0, math.MaxUint8) + testMinMaxValues[uint16](t, 0, math.MaxUint16) + testMinMaxValues[uint32](t, 0, math.MaxUint32) + testMinMaxValues[uint64](t, 0, math.MaxUint64) + testMinMaxValues[int8](t, math.MinInt8, math.MaxInt8) + testMinMaxValues[int16](t, math.MinInt16, math.MaxInt16) + testMinMaxValues[int32](t, math.MinInt32, math.MaxInt32) + testMinMaxValues[int64](t, math.MinInt64, math.MaxInt64) +} + +func TestSaturatingAdd(t *testing.T) { + tests := []struct { + a, b, expected int64 + }{ + {2, 3, 5}, + {-1, -2, -3}, + {math.MaxInt64, 1, math.MaxInt64}, + {math.MaxInt64, math.MaxInt64, math.MaxInt64}, + {math.MinInt64, -1, math.MinInt64}, + {math.MinInt64, math.MinInt64, math.MinInt64}, + } + + for _, tc := range tests { + t.Run(fmt.Sprintf("%v + %v = %v", tc.a, tc.b, tc.expected), func(t *testing.T) { + sum := SaturatingAdd(int64(tc.a), int64(tc.b)) + if sum != tc.expected { + t.Errorf("SaturatingAdd(%v, %v) = %v; want %v", tc.a, tc.b, sum, tc.expected) + } + }) + } +} + +func TestSaturatingSub(t *testing.T) { + tests := []struct { + a, b, expected int64 + }{ + {5, 3, 2}, + {-3, -2, -1}, + {math.MinInt64, 1, math.MinInt64}, + {math.MinInt64, -1, math.MinInt64 + 1}, + {math.MinInt64, math.MinInt64, 0}, + {0, math.MinInt64, math.MaxInt64}, + } + + for _, tc := range tests { + t.Run("", func(t *testing.T) { + sum := SaturatingSub(int64(tc.a), int64(tc.b)) + if sum != tc.expected { + t.Errorf("SaturatingSub(%v, %v) = %v; want %v", tc.a, tc.b, sum, tc.expected) + } + }) + } +} + +func TestSaturatingMul(t *testing.T) { + tests := []struct { + a, b, expected int64 + }{ + {5, 3, 15}, + {-3, -2, 6}, + {math.MaxInt64, 2, math.MaxInt64}, + {math.MinInt64, 2, math.MinInt64}, + } + + for _, tc := range tests { + t.Run(fmt.Sprintf("%v - %v = %v", tc.a, tc.b, tc.expected), func(t *testing.T) { + sum := SaturatingMul(int64(tc.a), int64(tc.b)) + if sum != tc.expected { + t.Errorf("SaturatingMul(%v, %v) = %v; want %v", tc.a, tc.b, sum, tc.expected) + } + }) + } +} + +func TestSaturatingNeg(t *testing.T) { + tests := []struct { + value int64 + expected int64 + }{ + {0, 0}, + {5, -5}, + {-5, 5}, + {math.MinInt64, math.MaxInt64}, + {math.MaxInt64, math.MinInt64 + 1}, + } + + for _, tc := range tests { + t.Run(fmt.Sprintf("-%v = %v", tc.value, tc.expected), func(t *testing.T) { + result := SaturatingNeg(tc.value) + if result != tc.expected { + t.Errorf("SaturatingNeg(%v) = %v: expected %v", tc.value, result, tc.expected) + } + }) + } +} + func Fail(t *testing.T, printables ...interface{}) { t.Helper() testhelpers.FailImpl(t, printables...) From bd880e9c340667c08e9fe9cbb9577b8067198016 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Thu, 16 May 2024 10:37:55 -0500 Subject: [PATCH 21/30] Limit min/max functions to Signed --- util/arbmath/math.go | 30 ++++++++++-------------------- util/arbmath/math_fuzz_test.go | 10 +++++----- util/arbmath/math_test.go | 20 ++++++++------------ 3 files changed, 23 insertions(+), 37 deletions(-) diff --git a/util/arbmath/math.go b/util/arbmath/math.go index 8f93caa87d..d7a0d1f523 100644 --- a/util/arbmath/math.go +++ b/util/arbmath/math.go @@ -259,32 +259,22 @@ func BigFloatMulByUint(multiplicand *big.Float, multiplier uint64) *big.Float { return new(big.Float).Mul(multiplicand, UintToBigFloat(multiplier)) } -func MaxIntValue[T Integer]() T { - allBits := ^T(0) - if allBits < 0 { - // This is a signed integer - return T((uint64(1) << (8*unsafe.Sizeof(allBits) - 1)) - 1) - } - return allBits +func MaxSignedValue[T Signed]() T { + return T((uint64(1) << (8*unsafe.Sizeof(T(0)) - 1)) - 1) } -func MinIntValue[T Integer]() T { - allBits := ^T(0) - if allBits < 0 { - // This is a signed integer - return T(uint64(1) << ((8 * unsafe.Sizeof(allBits)) - 1)) - } - return 0 +func MinSignedValue[T Signed]() T { + return T(uint64(1) << ((8 * unsafe.Sizeof(T(0))) - 1)) } // SaturatingAdd add two integers without overflow func SaturatingAdd[T Signed](a, b T) T { sum := a + b if b > 0 && sum < a { - sum = MaxIntValue[T]() + sum = MaxSignedValue[T]() } if b < 0 && sum > a { - sum = MinIntValue[T]() + sum = MinSignedValue[T]() } return sum } @@ -329,9 +319,9 @@ func SaturatingMul[T Signed](a, b T) T { product := a * b if b != 0 && product/b != a { if (a > 0 && b > 0) || (a < 0 && b < 0) { - product = MaxIntValue[T]() + product = MaxSignedValue[T]() } else { - product = MinIntValue[T]() + product = MinSignedValue[T]() } } return product @@ -381,8 +371,8 @@ func SaturatingCastToUint(value *big.Int) uint64 { // Negates an int without underflow func SaturatingNeg[T Signed](value T) T { - if value < 0 && value == MinIntValue[T]() { - return MaxIntValue[T]() + if value < 0 && value == MinSignedValue[T]() { + return MaxSignedValue[T]() } return -value } diff --git a/util/arbmath/math_fuzz_test.go b/util/arbmath/math_fuzz_test.go index 6e27f2b70a..591d699de0 100644 --- a/util/arbmath/math_fuzz_test.go +++ b/util/arbmath/math_fuzz_test.go @@ -13,12 +13,12 @@ func toBig[T Signed](a T) *big.Int { } func saturatingBigToInt[T Signed](a *big.Int) T { - // MinIntValue and MaxIntValue are already separately tested - if a.Cmp(toBig(MaxIntValue[T]())) > 0 { - return MaxIntValue[T]() + // MinSignedValue and MaxSignedValue are already separately tested + if a.Cmp(toBig(MaxSignedValue[T]())) > 0 { + return MaxSignedValue[T]() } - if a.Cmp(toBig(MinIntValue[T]())) < 0 { - return MinIntValue[T]() + if a.Cmp(toBig(MinSignedValue[T]())) < 0 { + return MinSignedValue[T]() } return T(a.Int64()) } diff --git a/util/arbmath/math_test.go b/util/arbmath/math_test.go index 194d6d7c86..1be60dc58b 100644 --- a/util/arbmath/math_test.go +++ b/util/arbmath/math_test.go @@ -121,26 +121,22 @@ func TestSlices(t *testing.T) { assert_eq(SliceWithRunoff(data, 7, 8), []uint8{}) } -func testMinMaxValues[T Integer](t *testing.T, min T, max T) { - gotMin := MinIntValue[T]() +func testMinMaxSignedValues[T Signed](t *testing.T, min T, max T) { + gotMin := MinSignedValue[T]() if gotMin != min { Fail(t, "expected min", min, "but got", gotMin) } - gotMax := MaxIntValue[T]() + gotMax := MaxSignedValue[T]() if gotMax != max { Fail(t, "expected max", max, "but got", gotMax) } } -func TestMinMaxValues(t *testing.T) { - testMinMaxValues[uint8](t, 0, math.MaxUint8) - testMinMaxValues[uint16](t, 0, math.MaxUint16) - testMinMaxValues[uint32](t, 0, math.MaxUint32) - testMinMaxValues[uint64](t, 0, math.MaxUint64) - testMinMaxValues[int8](t, math.MinInt8, math.MaxInt8) - testMinMaxValues[int16](t, math.MinInt16, math.MaxInt16) - testMinMaxValues[int32](t, math.MinInt32, math.MaxInt32) - testMinMaxValues[int64](t, math.MinInt64, math.MaxInt64) +func TestMinMaxSignedValues(t *testing.T) { + testMinMaxSignedValues[int8](t, math.MinInt8, math.MaxInt8) + testMinMaxSignedValues[int16](t, math.MinInt16, math.MaxInt16) + testMinMaxSignedValues[int32](t, math.MinInt32, math.MaxInt32) + testMinMaxSignedValues[int64](t, math.MinInt64, math.MaxInt64) } func TestSaturatingAdd(t *testing.T) { From bf8f40a57cefea50899bb15caa989c6ec0bc40c6 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 18:09:52 +0200 Subject: [PATCH 22/30] Skip tls verification when making requests to secure signer from Dataposter --- arbnode/dataposter/data_poster.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index 7bc18a2121..04789d0fda 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -217,6 +217,10 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro func rpcClient(ctx context.Context, opts *ExternalSignerCfg) (*rpc.Client, error) { tlsCfg := &tls.Config{ MinVersion: tls.VersionTLS12, + // Dataposter verifies that signed transaction was signed by the account + // that it expects to be signed with. So signer is already authenticated + // on application level and does not need to rely on TLS for authentication. + InsecureSkipVerify: true, // #nosec G402 } if opts.ClientCert != "" && opts.ClientPrivateKey != "" { From 761e98dfe76f381b30fac569ce6f16d3b36e6f3c Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 20:23:00 +0200 Subject: [PATCH 23/30] Expose InsecureSkipVerify as a flag in external signer config --- arbnode/dataposter/data_poster.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index 04789d0fda..35826620f8 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -220,7 +220,7 @@ func rpcClient(ctx context.Context, opts *ExternalSignerCfg) (*rpc.Client, error // Dataposter verifies that signed transaction was signed by the account // that it expects to be signed with. So signer is already authenticated // on application level and does not need to rely on TLS for authentication. - InsecureSkipVerify: true, // #nosec G402 + InsecureSkipVerify: opts.InsecureSkipVerify, // #nosec G402 } if opts.ClientCert != "" && opts.ClientPrivateKey != "" { @@ -1227,6 +1227,8 @@ type ExternalSignerCfg struct { // (Optional) Client certificate key for mtls. // This is required when client-cert is set. ClientPrivateKey string `koanf:"client-private-key"` + // TLS config option, when enabled skips certificate verification of external signer. + InsecureSkipVerify bool `koanf:"insecure-skip-verify"` } type DangerousConfig struct { @@ -1280,6 +1282,7 @@ func addExternalSignerOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".root-ca", DefaultDataPosterConfig.ExternalSigner.RootCA, "external signer root CA") f.String(prefix+".client-cert", DefaultDataPosterConfig.ExternalSigner.ClientCert, "rpc client cert") f.String(prefix+".client-private-key", DefaultDataPosterConfig.ExternalSigner.ClientPrivateKey, "rpc client private key") + f.Bool(prefix+".client-private-key", DefaultDataPosterConfig.ExternalSigner.InsecureSkipVerify, "skip TLS certificate verification") } var DefaultDataPosterConfig = DataPosterConfig{ @@ -1301,7 +1304,7 @@ var DefaultDataPosterConfig = DataPosterConfig{ UseNoOpStorage: false, LegacyStorageEncoding: false, Dangerous: DangerousConfig{ClearDBStorage: false}, - ExternalSigner: ExternalSignerCfg{Method: "eth_signTransaction"}, + ExternalSigner: ExternalSignerCfg{Method: "eth_signTransaction", InsecureSkipVerify: true}, MaxFeeCapFormula: "((BacklogOfBatches * UrgencyGWei) ** 2) + ((ElapsedTime/ElapsedTimeBase) ** 2) * ElapsedTimeImportance + TargetPriceGWei", ElapsedTimeBase: 10 * time.Minute, ElapsedTimeImportance: 10, @@ -1334,7 +1337,7 @@ var TestDataPosterConfig = DataPosterConfig{ UseDBStorage: false, UseNoOpStorage: false, LegacyStorageEncoding: false, - ExternalSigner: ExternalSignerCfg{Method: "eth_signTransaction"}, + ExternalSigner: ExternalSignerCfg{Method: "eth_signTransaction", InsecureSkipVerify: true}, MaxFeeCapFormula: "((BacklogOfBatches * UrgencyGWei) ** 2) + ((ElapsedTime/ElapsedTimeBase) ** 2) * ElapsedTimeImportance + TargetPriceGWei", ElapsedTimeBase: 10 * time.Minute, ElapsedTimeImportance: 10, From b8503d8c2581d1c065800b01bc8be7740e41a9af Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 20:24:10 +0200 Subject: [PATCH 24/30] Fix flag initialization --- arbnode/dataposter/data_poster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index 35826620f8..8137cbac60 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -1282,7 +1282,7 @@ func addExternalSignerOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".root-ca", DefaultDataPosterConfig.ExternalSigner.RootCA, "external signer root CA") f.String(prefix+".client-cert", DefaultDataPosterConfig.ExternalSigner.ClientCert, "rpc client cert") f.String(prefix+".client-private-key", DefaultDataPosterConfig.ExternalSigner.ClientPrivateKey, "rpc client private key") - f.Bool(prefix+".client-private-key", DefaultDataPosterConfig.ExternalSigner.InsecureSkipVerify, "skip TLS certificate verification") + f.Bool(prefix+".insecure-skip-verify", DefaultDataPosterConfig.ExternalSigner.InsecureSkipVerify, "skip TLS certificate verification") } var DefaultDataPosterConfig = DataPosterConfig{ From 7b7159cb7de9ee78d318a4ea1f4f74df5781fbf7 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 16 May 2024 20:25:59 +0200 Subject: [PATCH 25/30] Keep insecureSkipVerify false by default in prod config --- arbnode/dataposter/data_poster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index 8137cbac60..fb35ac3c8d 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -1304,7 +1304,7 @@ var DefaultDataPosterConfig = DataPosterConfig{ UseNoOpStorage: false, LegacyStorageEncoding: false, Dangerous: DangerousConfig{ClearDBStorage: false}, - ExternalSigner: ExternalSignerCfg{Method: "eth_signTransaction", InsecureSkipVerify: true}, + ExternalSigner: ExternalSignerCfg{Method: "eth_signTransaction", InsecureSkipVerify: false}, MaxFeeCapFormula: "((BacklogOfBatches * UrgencyGWei) ** 2) + ((ElapsedTime/ElapsedTimeBase) ** 2) * ElapsedTimeImportance + TargetPriceGWei", ElapsedTimeBase: 10 * time.Minute, ElapsedTimeImportance: 10, From f8dcce964279ede50b1795cbc93b6ea0fdf4cb81 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Thu, 16 May 2024 13:35:52 -0500 Subject: [PATCH 26/30] Fix lastUpdateTimeOffset -> ArbitrumStartTime --- arbos/programs/data_pricer.go | 8 +++++--- arbos/programs/programs.go | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/arbos/programs/data_pricer.go b/arbos/programs/data_pricer.go index b0184d7dc7..ed7c98556d 100644 --- a/arbos/programs/data_pricer.go +++ b/arbos/programs/data_pricer.go @@ -27,12 +27,14 @@ const ( inertiaOffset ) +const ArbitrumStartTime = 1421388000 // the day it all began + const initialDemand = 0 // no demand const InitialHourlyBytes = 1 * (1 << 40) / (365 * 24) // 1Tb total footprint const initialBytesPerSecond = InitialHourlyBytes / (60 * 60) // refill each second -const initialLastUpdateTime = 1421388000 // the day it all began -const initialMinPrice = 82928201 // 5Mb = $1 -const initialInertia = 21360419 // expensive at 1Tb +const initialLastUpdateTime = ArbitrumStartTime +const initialMinPrice = 82928201 // 5Mb = $1 +const initialInertia = 21360419 // expensive at 1Tb func initDataPricer(sto *storage.Storage) { demand := sto.OpenStorageBackedUint32(demandOffset) diff --git a/arbos/programs/programs.go b/arbos/programs/programs.go index d3113ae98d..9d51172986 100644 --- a/arbos/programs/programs.go +++ b/arbos/programs/programs.go @@ -527,12 +527,12 @@ func (status userStatus) toResult(data []byte, debug bool) ([]byte, string, erro // Hours since Arbitrum began, rounded down. func hoursSinceArbitrum(time uint64) uint24 { - return uint24((time - lastUpdateTimeOffset) / 3600) + return uint24((time - ArbitrumStartTime) / 3600) } // Computes program age in seconds from the hours passed since Arbitrum began. func hoursToAge(time uint64, hours uint24) uint64 { seconds := am.SaturatingUMul(uint64(hours), 3600) - activatedAt := am.SaturatingUAdd(lastUpdateTimeOffset, seconds) + activatedAt := am.SaturatingUAdd(ArbitrumStartTime, seconds) return am.SaturatingUSub(time, activatedAt) } From f301094d5dd0019c1906e9f98a4cabbc720aea46 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Thu, 16 May 2024 15:03:25 -0500 Subject: [PATCH 27/30] Use SaturatingUSub for hoursSinceArbitrum --- arbos/programs/programs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbos/programs/programs.go b/arbos/programs/programs.go index 9d51172986..6f73e16b85 100644 --- a/arbos/programs/programs.go +++ b/arbos/programs/programs.go @@ -527,7 +527,7 @@ func (status userStatus) toResult(data []byte, debug bool) ([]byte, string, erro // Hours since Arbitrum began, rounded down. func hoursSinceArbitrum(time uint64) uint24 { - return uint24((time - ArbitrumStartTime) / 3600) + return am.SaturatingUUCast[uint24]((am.SaturatingUSub(time, ArbitrumStartTime)) / 3600) } // Computes program age in seconds from the hours passed since Arbitrum began. From 804e4fa75860e4dd482f0b1e59844e260d277ba4 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Fri, 17 May 2024 11:25:33 -0300 Subject: [PATCH 28/30] fix: CleanCacheSize from hashdb.Config expects a value defined in bytes, and not as in MB as TrieCleanLimit is defined --- cmd/staterecovery/staterecovery.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/staterecovery/staterecovery.go b/cmd/staterecovery/staterecovery.go index 6390826a91..58ad06ad14 100644 --- a/cmd/staterecovery/staterecovery.go +++ b/cmd/staterecovery/staterecovery.go @@ -31,7 +31,7 @@ func RecreateMissingStates(chainDb ethdb.Database, bc *core.BlockChain, cacheCon return fmt.Errorf("start block parent is missing, parent block number: %d", current-1) } hashConfig := *hashdb.Defaults - hashConfig.CleanCacheSize = cacheConfig.TrieCleanLimit + hashConfig.CleanCacheSize = cacheConfig.TrieCleanLimit * 1024 * 1024 trieConfig := &trie.Config{ Preimages: false, HashDB: &hashConfig, From faa405c6799d852a2a9d7cfb38e7688464627d97 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Mon, 20 May 2024 10:04:41 -0300 Subject: [PATCH 29/30] adjust error and log msg to use 'expected blockhashes' instead of 'last batch messages block hashes' when pruning block hashes from the db --- arbnode/message_pruner.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arbnode/message_pruner.go b/arbnode/message_pruner.go index c31dbc496d..5d18341a27 100644 --- a/arbnode/message_pruner.go +++ b/arbnode/message_pruner.go @@ -118,10 +118,10 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64) error { prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, blockHashInputFeedPrefix, &m.cachedPrunedBlockHashesInputFeed, uint64(messageCount)) if err != nil { - return fmt.Errorf("error deleting last batch messages' block hashes: %w", err) + return fmt.Errorf("error deleting expected block hashes: %w", err) } if len(prunedKeysRange) > 0 { - log.Info("Pruned last batch messages' block hashes:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1]) + log.Info("Pruned expected block hashes:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1]) } prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount)) From 7f8d471028d3093c5110dc429d5f5d9fddfe5878 Mon Sep 17 00:00:00 2001 From: Diego Ximenes Date: Mon, 20 May 2024 10:06:20 -0300 Subject: [PATCH 30/30] fix: uses arbnode.BlockHashMismatchLogMsg instead of block_hash_mismatch --- system_tests/seqfeed_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index 589a48d3af..ab30598b60 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -84,8 +84,8 @@ func TestSequencerFeed(t *testing.T) { t.Fatal("Unexpected balance:", l2balance) } - if logHandler.WasLogged("block_hash_mismatch") { - t.Fatal("block_hash_mismatch was logged unexpectedly") + if logHandler.WasLogged(arbnode.BlockHashMismatchLogMsg) { + t.Fatal("BlockHashMismatchLogMsg was logged unexpectedly") } }