Skip to content

Commit

Permalink
Merge pull request #2240 from OffchainLabs/block_hash_sequencer_feed
Browse files Browse the repository at this point in the history
[NIT-2407] Adds blocks' hashes to the sequencer's feed
  • Loading branch information
tsahee authored May 8, 2024
2 parents 1b74fae + 0cd3618 commit cb74941
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 62 deletions.
9 changes: 8 additions & 1 deletion arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,14 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas
if err != nil {
return fmt.Errorf("error getting message %v: %w", seqNum, err)
}
feedMessage, err := broadcastServer.NewBroadcastFeedMessage(*message, seqNum)

msgResult, err := t.txStreamer.ResultAtCount(seqNum)
var blockHash *common.Hash
if err == nil {
blockHash = &msgResult.BlockHash
}

feedMessage, err := broadcastServer.NewBroadcastFeedMessage(*message, seqNum, blockHash)
if err != nil {
return fmt.Errorf("error creating broadcast feed message %v: %w", seqNum, err)
}
Expand Down
4 changes: 2 additions & 2 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,8 +1011,8 @@ func (n *Node) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex,
return n.InboxReader.GetFinalizedMsgCount(ctx)
}

func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta)
func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta, msgResult)
}

func (n *Node) ExpectChosenSequencer() error {
Expand Down
51 changes: 42 additions & 9 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,20 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde
s.reorgMutex.Lock()
defer s.reorgMutex.Unlock()

err = s.exec.Reorg(count, newMessages, oldMessages)
messagesResults, err := s.exec.Reorg(count, newMessages, oldMessages)
if err != nil {
return err
}

messagesWithBlockHash := make([]broadcaster.MessageWithMetadataAndBlockHash, 0, len(messagesResults))
for i := 0; i < len(messagesResults); i++ {
messagesWithBlockHash = append(messagesWithBlockHash, broadcaster.MessageWithMetadataAndBlockHash{
Message: newMessages[i],
BlockHash: &messagesResults[i].BlockHash,
})
}
s.broadcastMessages(messagesWithBlockHash, count)

if s.validator != nil {
err = s.validator.Reorg(s.GetContext(), count)
if err != nil {
Expand Down Expand Up @@ -970,7 +979,11 @@ func (s *TransactionStreamer) ExpectChosenSequencer() error {
return nil
}

func (s *TransactionStreamer) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error {
func (s *TransactionStreamer) WriteMessageFromSequencer(
pos arbutil.MessageIndex,
msgWithMeta arbostypes.MessageWithMetadata,
msgResult execution.MessageResult,
) error {
if err := s.ExpectChosenSequencer(); err != nil {
return err
}
Expand Down Expand Up @@ -998,6 +1011,12 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(pos arbutil.MessageIndex
return err
}

msgWithBlockHash := broadcaster.MessageWithMetadataAndBlockHash{
Message: msgWithMeta,
BlockHash: &msgResult.BlockHash,
}
s.broadcastMessages([]broadcaster.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos)

return nil
}

Expand Down Expand Up @@ -1026,6 +1045,18 @@ func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbosty
return batch.Put(key, msgBytes)
}

func (s *TransactionStreamer) broadcastMessages(
msgs []broadcaster.MessageWithMetadataAndBlockHash,
pos arbutil.MessageIndex,
) {
if s.broadcastServer == nil {
return
}
if err := s.broadcastServer.BroadcastMessages(msgs, pos); err != nil {
log.Error("failed broadcasting messages", "pos", pos, "err", err)
}
}

// The mutex must be held, and pos must be the latest message count.
// `batch` may be nil, which initializes a new batch. The batch is closed out in this function.
func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error {
Expand Down Expand Up @@ -1053,12 +1084,6 @@ func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages [
default:
}

if s.broadcastServer != nil {
if err := s.broadcastServer.BroadcastMessages(messages, pos); err != nil {
log.Error("failed broadcasting message", "pos", pos, "err", err)
}
}

return nil
}

Expand Down Expand Up @@ -1110,14 +1135,22 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context, exec execution
}
msgForPrefetch = msg
}
if err = s.exec.DigestMessage(pos, msg, msgForPrefetch); err != nil {
msgResult, err := s.exec.DigestMessage(pos, msg, msgForPrefetch)
if err != nil {
logger := log.Warn
if prevMessageCount < msgCount {
logger = log.Debug
}
logger("feedOneMsg failed to send message to execEngine", "err", err, "pos", pos)
return false
}

msgWithBlockHash := broadcaster.MessageWithMetadataAndBlockHash{
Message: *msg,
BlockHash: &msgResult.BlockHash,
}
s.broadcastMessages([]broadcaster.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos)

return pos+1 < msgCount
}

Expand Down
12 changes: 6 additions & 6 deletions broadcastclient/broadcastclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func testReceiveMessages(t *testing.T, clientCompression bool, serverCompression

go func() {
for i := 0; i < messageCount; i++ {
Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i)))
Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i), nil))
}
}()

Expand Down Expand Up @@ -156,7 +156,7 @@ func TestInvalidSignature(t *testing.T) {

go func() {
for i := 0; i < messageCount; i++ {
Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i)))
Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i), nil))
}
}()

Expand Down Expand Up @@ -316,7 +316,7 @@ func TestServerClientDisconnect(t *testing.T) {
broadcastClient.Start(ctx)

t.Log("broadcasting seq 0 message")
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil))

// Wait for client to receive batch to ensure it is connected
timer := time.NewTimer(5 * time.Second)
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestBroadcastClientConfirmedMessage(t *testing.T) {
broadcastClient.Start(ctx)

t.Log("broadcasting seq 0 message")
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil))

// Wait for client to receive batch to ensure it is connected
timer := time.NewTimer(5 * time.Second)
Expand Down Expand Up @@ -724,8 +724,8 @@ func TestBroadcasterSendsCachedMessagesOnClientConnect(t *testing.T) {
Require(t, b.Start(ctx))
defer b.StopAndWait()

Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 1))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 1, nil))

var wg sync.WaitGroup
for i := 0; i < 2; i++ {
Expand Down
30 changes: 24 additions & 6 deletions broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gobwas/ws"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"

"github.com/offchainlabs/nitro/arbos/arbostypes"
Expand All @@ -21,6 +22,11 @@ import (
"github.com/offchainlabs/nitro/wsbroadcastserver"
)

type MessageWithMetadataAndBlockHash struct {
Message arbostypes.MessageWithMetadata
BlockHash *common.Hash
}

type Broadcaster struct {
server *wsbroadcastserver.WSBroadcastServer
backlog backlog.Backlog
Expand All @@ -38,7 +44,11 @@ func NewBroadcaster(config wsbroadcastserver.BroadcasterConfigFetcher, chainId u
}
}

func (b *Broadcaster) NewBroadcastFeedMessage(message arbostypes.MessageWithMetadata, sequenceNumber arbutil.MessageIndex) (*m.BroadcastFeedMessage, error) {
func (b *Broadcaster) NewBroadcastFeedMessage(
message arbostypes.MessageWithMetadata,
sequenceNumber arbutil.MessageIndex,
blockHash *common.Hash,
) (*m.BroadcastFeedMessage, error) {
var messageSignature []byte
if b.dataSigner != nil {
hash, err := message.Hash(sequenceNumber, b.chainId)
Expand All @@ -54,18 +64,23 @@ func (b *Broadcaster) NewBroadcastFeedMessage(message arbostypes.MessageWithMeta
return &m.BroadcastFeedMessage{
SequenceNumber: sequenceNumber,
Message: message,
BlockHash: blockHash,
Signature: messageSignature,
}, nil
}

func (b *Broadcaster) BroadcastSingle(msg arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) (err error) {
func (b *Broadcaster) BroadcastSingle(
msg arbostypes.MessageWithMetadata,
seq arbutil.MessageIndex,
blockHash *common.Hash,
) (err error) {
defer func() {
if r := recover(); r != nil {
log.Error("recovered error in BroadcastSingle", "recover", r, "backtrace", string(debug.Stack()))
err = errors.New("panic in BroadcastSingle")
}
}()
bfm, err := b.NewBroadcastFeedMessage(msg, seq)
bfm, err := b.NewBroadcastFeedMessage(msg, seq, blockHash)
if err != nil {
return err
}
Expand All @@ -82,16 +97,19 @@ func (b *Broadcaster) BroadcastSingleFeedMessage(bfm *m.BroadcastFeedMessage) {
b.BroadcastFeedMessages(broadcastFeedMessages)
}

func (b *Broadcaster) BroadcastMessages(messages []arbostypes.MessageWithMetadata, seq arbutil.MessageIndex) (err error) {
func (b *Broadcaster) BroadcastMessages(
messagesWithBlockHash []MessageWithMetadataAndBlockHash,
seq arbutil.MessageIndex,
) (err error) {
defer func() {
if r := recover(); r != nil {
log.Error("recovered error in BroadcastMessages", "recover", r, "backtrace", string(debug.Stack()))
err = errors.New("panic in BroadcastMessages")
}
}()
var feedMessages []*m.BroadcastFeedMessage
for i, msg := range messages {
bfm, err := b.NewBroadcastFeedMessage(msg, seq+arbutil.MessageIndex(i))
for i, msg := range messagesWithBlockHash {
bfm, err := b.NewBroadcastFeedMessage(msg.Message, seq+arbutil.MessageIndex(i), msg.BlockHash)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions broadcaster/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ func TestBroadcasterMessagesRemovedOnConfirmation(t *testing.T) {
}

// Normal broadcasting and confirming
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 1))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 1, nil))
waitUntilUpdated(t, expectMessageCount(1, "after 1 message"))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 2))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 2, nil))
waitUntilUpdated(t, expectMessageCount(2, "after 2 messages"))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 3))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 3, nil))
waitUntilUpdated(t, expectMessageCount(3, "after 3 messages"))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 4))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 4, nil))
waitUntilUpdated(t, expectMessageCount(4, "after 4 messages"))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 5))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 5, nil))
waitUntilUpdated(t, expectMessageCount(5, "after 4 messages"))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 6))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 6, nil))
waitUntilUpdated(t, expectMessageCount(6, "after 4 messages"))

b.Confirm(4)
Expand All @@ -96,7 +96,7 @@ func TestBroadcasterMessagesRemovedOnConfirmation(t *testing.T) {
"nothing changed because confirmed sequence number before cache"))

b.Confirm(5)
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 7))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 7, nil))
waitUntilUpdated(t, expectMessageCount(2,
"after 7 messages, 5 cleared by confirm"))

Expand Down
1 change: 1 addition & 0 deletions broadcaster/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type BroadcastMessage struct {
type BroadcastFeedMessage struct {
SequenceNumber arbutil.MessageIndex `json:"sequenceNumber"`
Message arbostypes.MessageWithMetadata `json:"message"`
BlockHash *common.Hash `json:"blockHash,omitempty"`
Signature []byte `json:"signature"`

CumulativeSumMsgSize uint64 `json:"-"`
Expand Down
35 changes: 34 additions & 1 deletion broadcaster/message/message_serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,40 @@ import (
"github.com/offchainlabs/nitro/arbos/arbostypes"
)

func ExampleBroadcastMessage_broadcastfeedmessage() {
func ExampleBroadcastMessage_broadcastfeedmessageWithBlockHash() {
var requestId common.Hash
msg := BroadcastMessage{
Version: 1,
Messages: []*BroadcastFeedMessage{
{
SequenceNumber: 12345,
Message: arbostypes.MessageWithMetadata{
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
Kind: 0,
Poster: [20]byte{},
BlockNumber: 0,
Timestamp: 0,
RequestId: &requestId,
L1BaseFee: big.NewInt(0),
},
L2msg: []byte{0xde, 0xad, 0xbe, 0xef},
},
DelayedMessagesRead: 3333,
},
BlockHash: &common.Hash{0: 0xff},
Signature: nil,
},
},
}
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
_ = encoder.Encode(msg)
fmt.Println(buf.String())
// Output: {"version":1,"messages":[{"sequenceNumber":12345,"message":{"message":{"header":{"kind":0,"sender":"0x0000000000000000000000000000000000000000","blockNumber":0,"timestamp":0,"requestId":"0x0000000000000000000000000000000000000000000000000000000000000000","baseFeeL1":0},"l2Msg":"3q2+7w=="},"delayedMessagesRead":3333},"blockHash":"0xff00000000000000000000000000000000000000000000000000000000000000","signature":null}]}
}

func ExampleBroadcastMessage_broadcastfeedmessageWithoutBlockHash() {
var requestId common.Hash
msg := BroadcastMessage{
Version: 1,
Expand Down
Loading

0 comments on commit cb74941

Please sign in to comment.