Skip to content

Commit

Permalink
broadcast messages after producing a block and not after writing them…
Browse files Browse the repository at this point in the history
… in the db
  • Loading branch information
diegoximenes committed Apr 25, 2024
1 parent 8db287c commit eaf6a8c
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 27 deletions.
4 changes: 4 additions & 0 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,10 @@ func (n *Node) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex,
return n.InboxReader.GetFinalizedMsgCount(ctx)
}

func (n *Node) BroadcastMessage(msg arbostypes.MessageWithMetadata, pos arbutil.MessageIndex) {
n.TxStreamer.BroadcastMessage(msg, pos)
}

func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta)
}
Expand Down
16 changes: 10 additions & 6 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(pos arbutil.MessageIndex
if err := s.writeMessages(pos, []arbostypes.MessageWithMetadata{msgWithMeta}, nil); err != nil {
return err
}
s.BroadcastMessage(msgWithMeta, pos)

return nil
}
Expand Down Expand Up @@ -1029,6 +1030,15 @@ func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbosty
return batch.Put(key, msgBytes)
}

func (s *TransactionStreamer) BroadcastMessage(msg arbostypes.MessageWithMetadata, pos arbutil.MessageIndex) {
if s.broadcastServer == nil {
return
}
if err := s.broadcastServer.BroadcastSingle(msg, pos); err != nil {
log.Error("failed broadcasting message", "pos", pos, "err", err)
}
}

// The mutex must be held, and pos must be the latest message count.
// `batch` may be nil, which initializes a new batch. The batch is closed out in this function.
func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error {
Expand Down Expand Up @@ -1056,12 +1066,6 @@ func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages [
default:
}

if s.broadcastServer != nil {
if err := s.broadcastServer.BroadcastMessages(messages, pos); err != nil {
log.Error("failed broadcasting message", "pos", pos, "err", err)
}
}

return nil
}

Expand Down
21 changes: 0 additions & 21 deletions broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,6 @@ func (b *Broadcaster) BroadcastSingleFeedMessage(bfm *m.BroadcastFeedMessage) {
b.BroadcastFeedMessages(broadcastFeedMessages)
}

func (b *Broadcaster) BroadcastMessages(messages []arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) (err error) {
defer func() {
if r := recover(); r != nil {
log.Error("recovered error in BroadcastMessages", "recover", r, "backtrace", string(debug.Stack()))
err = errors.New("panic in BroadcastMessages")
}
}()
var feedMessages []*m.BroadcastFeedMessage
for i, msg := range messages {
bfm, err := b.NewBroadcastFeedMessage(msg, seq+arbutil.MessageIndex(i))
if err != nil {
return err
}
feedMessages = append(feedMessages, bfm)
}

b.BroadcastFeedMessages(feedMessages)

return nil
}

func (b *Broadcaster) BroadcastFeedMessages(messages []*m.BroadcastFeedMessage) {

bm := &m.BroadcastMessage{
Expand Down
8 changes: 8 additions & 0 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,14 @@ func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex,
if err != nil {
return err
}

if s.consensus != nil {
l2BlockHash := block.Hash()
msg.L2BlockHash = &l2BlockHash

s.consensus.BroadcastMessage(*msg, num)
}

err = s.appendBlock(block, statedb, receipts, time.Since(startTime))
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type ConsensusInfo interface {
}

type ConsensusSequencer interface {
BroadcastMessage(msg arbostypes.MessageWithMetadata, pos arbutil.MessageIndex)
WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error
ExpectChosenSequencer() error
CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64)
Expand Down

0 comments on commit eaf6a8c

Please sign in to comment.