Skip to content

Commit

Permalink
moves blockHash to BroadcastFeedMessage again
Browse files Browse the repository at this point in the history
  • Loading branch information
diegoximenes committed Apr 25, 2024
1 parent eaf6a8c commit a679e42
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 53 deletions.
2 changes: 1 addition & 1 deletion arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas
if err != nil {
return fmt.Errorf("error getting message %v: %w", seqNum, err)
}
feedMessage, err := broadcastServer.NewBroadcastFeedMessage(*message, seqNum)
feedMessage, err := broadcastServer.NewBroadcastFeedMessage(*message, seqNum, nil)
if err != nil {
return fmt.Errorf("error creating broadcast feed message %v: %w", seqNum, err)
}
Expand Down
8 changes: 4 additions & 4 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,12 +997,12 @@ func (n *Node) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex,
return n.InboxReader.GetFinalizedMsgCount(ctx)
}

func (n *Node) BroadcastMessage(msg arbostypes.MessageWithMetadata, pos arbutil.MessageIndex) {
n.TxStreamer.BroadcastMessage(msg, pos)
func (n *Node) BroadcastMessage(msg arbostypes.MessageWithMetadata, pos arbutil.MessageIndex, msgResult execution.MessageResult) {
n.TxStreamer.BroadcastMessage(msg, pos, msgResult)
}

func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta)
func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta, msgResult)
}

func (n *Node) ExpectChosenSequencer() error {
Expand Down
30 changes: 17 additions & 13 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,25 +773,22 @@ func (s *TransactionStreamer) countDuplicateMessages(
}
var duplicateMessage bool
if nextMessage.Message != nil {
if dbMessageParsed.Message.BatchGasCost == nil || nextMessage.Message.BatchGasCost == nil || dbMessageParsed.L2BlockHash == nil || nextMessage.L2BlockHash == nil {
// Remove both of the batch gas costs and l2 block hashes and see if the messages still differ
if dbMessageParsed.Message.BatchGasCost == nil || nextMessage.Message.BatchGasCost == nil {
// Remove both of the batch gas costs and see if the messages still differ
nextMessageCopy := nextMessage
nextMessageCopy.Message = new(arbostypes.L1IncomingMessage)
*nextMessageCopy.Message = *nextMessage.Message

batchGasCostBkup := dbMessageParsed.Message.BatchGasCost
l2BlockHashBkup := dbMessageParsed.L2BlockHash

dbMessageParsed.Message.BatchGasCost = nil
dbMessageParsed.L2BlockHash = nil
nextMessageCopy.Message.BatchGasCost = nil
nextMessageCopy.L2BlockHash = nil

if reflect.DeepEqual(dbMessageParsed, nextMessageCopy) {
// Actually this isn't a reorg; only the batch gas costs or l2 block hashes differed
// Actually this isn't a reorg; only the batch gas costs differed
duplicateMessage = true
// If possible - update the message in the database to add the gas cost and l2 block hashes.
if batch != nil && (nextMessage.Message.BatchGasCost != nil || nextMessage.L2BlockHash != nil) {
// If possible - update the message in the database to add the gas cost cache.
if batch != nil && nextMessage.Message.BatchGasCost != nil {
if *batch == nil {
*batch = s.db.NewBatch()
}
Expand All @@ -801,7 +798,6 @@ func (s *TransactionStreamer) countDuplicateMessages(
}
}
dbMessageParsed.Message.BatchGasCost = batchGasCostBkup
dbMessageParsed.L2BlockHash = l2BlockHashBkup
}
}

Expand Down Expand Up @@ -973,7 +969,11 @@ func (s *TransactionStreamer) ExpectChosenSequencer() error {
return nil
}

func (s *TransactionStreamer) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error {
func (s *TransactionStreamer) WriteMessageFromSequencer(
pos arbutil.MessageIndex,
msgWithMeta arbostypes.MessageWithMetadata,
msgResult execution.MessageResult,
) error {
if err := s.ExpectChosenSequencer(); err != nil {
return err
}
Expand All @@ -1000,7 +1000,7 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(pos arbutil.MessageIndex
if err := s.writeMessages(pos, []arbostypes.MessageWithMetadata{msgWithMeta}, nil); err != nil {
return err
}
s.BroadcastMessage(msgWithMeta, pos)
s.BroadcastMessage(msgWithMeta, pos, msgResult)

return nil
}
Expand Down Expand Up @@ -1030,11 +1030,15 @@ func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbosty
return batch.Put(key, msgBytes)
}

func (s *TransactionStreamer) BroadcastMessage(msg arbostypes.MessageWithMetadata, pos arbutil.MessageIndex) {
func (s *TransactionStreamer) BroadcastMessage(
msg arbostypes.MessageWithMetadata,
pos arbutil.MessageIndex,
msgResult execution.MessageResult,
) {
if s.broadcastServer == nil {
return
}
if err := s.broadcastServer.BroadcastSingle(msg, pos); err != nil {
if err := s.broadcastServer.BroadcastSingle(msg, pos, &msgResult.BlockHash); err != nil {
log.Error("failed broadcasting message", "pos", pos, "err", err)
}
}
Expand Down
1 change: 0 additions & 1 deletion arbos/arbostypes/messagewithmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ var uniquifyingPrefix = []byte("Arbitrum Nitro Feed:")
type MessageWithMetadata struct {
Message *L1IncomingMessage `json:"message"`
DelayedMessagesRead uint64 `json:"delayedMessagesRead"`
L2BlockHash *common.Hash `json:"l2BlockHash,omitempty" rlp:"nilList,optional"`
}

var EmptyTestMessageWithMetadata = MessageWithMetadata{
Expand Down
12 changes: 6 additions & 6 deletions broadcastclient/broadcastclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func testReceiveMessages(t *testing.T, clientCompression bool, serverCompression

go func() {
for i := 0; i < messageCount; i++ {
Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i)))
Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i), nil))
}
}()

Expand Down Expand Up @@ -156,7 +156,7 @@ func TestInvalidSignature(t *testing.T) {

go func() {
for i := 0; i < messageCount; i++ {
Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i)))
Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i), nil))
}
}()

Expand Down Expand Up @@ -316,7 +316,7 @@ func TestServerClientDisconnect(t *testing.T) {
broadcastClient.Start(ctx)

t.Log("broadcasting seq 0 message")
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil))

// Wait for client to receive batch to ensure it is connected
timer := time.NewTimer(5 * time.Second)
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestBroadcastClientConfirmedMessage(t *testing.T) {
broadcastClient.Start(ctx)

t.Log("broadcasting seq 0 message")
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil))

// Wait for client to receive batch to ensure it is connected
timer := time.NewTimer(5 * time.Second)
Expand Down Expand Up @@ -724,8 +724,8 @@ func TestBroadcasterSendsCachedMessagesOnClientConnect(t *testing.T) {
Require(t, b.Start(ctx))
defer b.StopAndWait()

Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 1))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 1, nil))

var wg sync.WaitGroup
for i := 0; i < 2; i++ {
Expand Down
16 changes: 13 additions & 3 deletions broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gobwas/ws"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"

"github.com/offchainlabs/nitro/arbos/arbostypes"
Expand Down Expand Up @@ -38,7 +39,11 @@ func NewBroadcaster(config wsbroadcastserver.BroadcasterConfigFetcher, chainId u
}
}

func (b *Broadcaster) NewBroadcastFeedMessage(message arbostypes.MessageWithMetadata, sequenceNumber arbutil.MessageIndex) (*m.BroadcastFeedMessage, error) {
func (b *Broadcaster) NewBroadcastFeedMessage(
message arbostypes.MessageWithMetadata,
sequenceNumber arbutil.MessageIndex,
blockHash *common.Hash,
) (*m.BroadcastFeedMessage, error) {
var messageSignature []byte
if b.dataSigner != nil {
hash, err := message.Hash(sequenceNumber, b.chainId)
Expand All @@ -54,18 +59,23 @@ func (b *Broadcaster) NewBroadcastFeedMessage(message arbostypes.MessageWithMeta
return &m.BroadcastFeedMessage{
SequenceNumber: sequenceNumber,
Message: message,
BlockHash: blockHash,
Signature: messageSignature,
}, nil
}

func (b *Broadcaster) BroadcastSingle(msg arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) (err error) {
func (b *Broadcaster) BroadcastSingle(
msg arbostypes.MessageWithMetadata,
seq arbutil.MessageIndex,
blockHash *common.Hash,
) (err error) {
defer func() {
if r := recover(); r != nil {
log.Error("recovered error in BroadcastSingle", "recover", r, "backtrace", string(debug.Stack()))
err = errors.New("panic in BroadcastSingle")
}
}()
bfm, err := b.NewBroadcastFeedMessage(msg, seq)
bfm, err := b.NewBroadcastFeedMessage(msg, seq, blockHash)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions broadcaster/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ func TestBroadcasterMessagesRemovedOnConfirmation(t *testing.T) {
}

// Normal broadcasting and confirming
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 1))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 1, nil))
waitUntilUpdated(t, expectMessageCount(1, "after 1 message"))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 2))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 2, nil))
waitUntilUpdated(t, expectMessageCount(2, "after 2 messages"))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 3))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 3, nil))
waitUntilUpdated(t, expectMessageCount(3, "after 3 messages"))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 4))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 4, nil))
waitUntilUpdated(t, expectMessageCount(4, "after 4 messages"))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 5))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 5, nil))
waitUntilUpdated(t, expectMessageCount(5, "after 4 messages"))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 6))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 6, nil))
waitUntilUpdated(t, expectMessageCount(6, "after 4 messages"))

b.Confirm(4)
Expand All @@ -96,7 +96,7 @@ func TestBroadcasterMessagesRemovedOnConfirmation(t *testing.T) {
"nothing changed because confirmed sequence number before cache"))

b.Confirm(5)
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 7))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 7, nil))
waitUntilUpdated(t, expectMessageCount(2,
"after 7 messages, 5 cleared by confirm"))

Expand Down
1 change: 1 addition & 0 deletions broadcaster/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type BroadcastMessage struct {
type BroadcastFeedMessage struct {
SequenceNumber arbutil.MessageIndex `json:"sequenceNumber"`
Message arbostypes.MessageWithMetadata `json:"message"`
BlockHash *common.Hash `json:"blockHash,omitempty"`
Signature []byte `json:"signature"`

CumulativeSumMsgSize uint64 `json:"-"`
Expand Down
4 changes: 2 additions & 2 deletions broadcaster/message/message_serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func ExampleBroadcastMessage_broadcastfeedmessageWithBlockHash() {
L2msg: []byte{0xde, 0xad, 0xbe, 0xef},
},
DelayedMessagesRead: 3333,
L2BlockHash: &common.Hash{0: 0xff},
},
BlockHash: &common.Hash{0: 0xff},
Signature: nil,
},
},
Expand All @@ -43,7 +43,7 @@ func ExampleBroadcastMessage_broadcastfeedmessageWithBlockHash() {
encoder := json.NewEncoder(&buf)
_ = encoder.Encode(msg)
fmt.Println(buf.String())
// Output: {"version":1,"messages":[{"sequenceNumber":12345,"message":{"message":{"header":{"kind":0,"sender":"0x0000000000000000000000000000000000000000","blockNumber":0,"timestamp":0,"requestId":"0x0000000000000000000000000000000000000000000000000000000000000000","baseFeeL1":0},"l2Msg":"3q2+7w=="},"delayedMessagesRead":3333,"l2BlockHash":"0xff00000000000000000000000000000000000000000000000000000000000000"},"signature":null}]}
// Output: {"version":1,"messages":[{"sequenceNumber":12345,"message":{"message":{"header":{"kind":0,"sender":"0x0000000000000000000000000000000000000000","blockNumber":0,"timestamp":0,"requestId":"0x0000000000000000000000000000000000000000000000000000000000000000","baseFeeL1":0},"l2Msg":"3q2+7w=="},"delayedMessagesRead":3333},"blockHash":"0xff00000000000000000000000000000000000000000000000000000000000000","signature":null}]}
}

func ExampleBroadcastMessage_broadcastfeedmessageWithoutBlockHash() {
Expand Down
30 changes: 16 additions & 14 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,19 +358,20 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.
return nil, err
}

l2BlockHash := block.Hash()
pos, err := s.BlockNumberToMessageIndex(lastBlockHeader.Number.Uint64() + 1)
if err != nil {
return nil, err
}

msgWithMeta := arbostypes.MessageWithMetadata{
Message: msg,
DelayedMessagesRead: delayedMessagesRead,
L2BlockHash: &l2BlockHash,
}

pos, err := s.BlockNumberToMessageIndex(lastBlockHeader.Number.Uint64() + 1)
if err != nil {
return nil, err
msgResult := execution.MessageResult{
BlockHash: block.Hash(),
}

err = s.consensus.WriteMessageFromSequencer(pos, msgWithMeta)
err = s.consensus.WriteMessageFromSequencer(pos, msgWithMeta, msgResult)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -423,10 +424,11 @@ func (s *ExecutionEngine) sequenceDelayedMessageWithBlockMutex(message *arbostyp
}
blockCalcTime := time.Since(startTime)

blockHash := block.Hash()
messageWithMeta.L2BlockHash = &blockHash
msgResult := execution.MessageResult{
BlockHash: block.Hash(),
}

err = s.consensus.WriteMessageFromSequencer(lastMsg+1, messageWithMeta)
err = s.consensus.WriteMessageFromSequencer(lastMsg+1, messageWithMeta, msgResult)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -632,10 +634,10 @@ func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex,
}

if s.consensus != nil {
l2BlockHash := block.Hash()
msg.L2BlockHash = &l2BlockHash

s.consensus.BroadcastMessage(*msg, num)
msgResult := execution.MessageResult{
BlockHash: block.Hash(),
}
s.consensus.BroadcastMessage(*msg, num, msgResult)
}

err = s.appendBlock(block, statedb, receipts, time.Since(startTime))
Expand Down
4 changes: 2 additions & 2 deletions execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ type ConsensusInfo interface {
}

type ConsensusSequencer interface {
BroadcastMessage(msg arbostypes.MessageWithMetadata, pos arbutil.MessageIndex)
WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error
BroadcastMessage(msg arbostypes.MessageWithMetadata, pos arbutil.MessageIndex, msgResult MessageResult)
WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult MessageResult) error
ExpectChosenSequencer() error
CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64)
BacklogL1GasCharged() uint64
Expand Down

0 comments on commit a679e42

Please sign in to comment.