Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Before posting a batch, run it through the inbox multiplexer and confirm it produces the expected set of messages #2371

Merged
merged 14 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package arbnode
import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -99,6 +100,7 @@ type BatchPoster struct {
gasRefunderAddr common.Address
building *buildingBatch
dapWriter daprovider.Writer
dapReaders []daprovider.Reader
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
messagesPerBatch *arbmath.MovingAverage[uint64]
Expand Down Expand Up @@ -156,6 +158,7 @@ type BatchPosterConfig struct {
L1BlockBoundBypass time.Duration `koanf:"l1-block-bound-bypass" reload:"hot"`
UseAccessLists bool `koanf:"use-access-lists" reload:"hot"`
GasEstimateBaseFeeMultipleBips arbmath.Bips `koanf:"gas-estimate-base-fee-multiple-bips"`
CheckBatchCorrectness bool `koanf:"check-batch-correctness" reload:"hot"`

gasRefunder common.Address
l1BlockBound l1BlockBound
Expand Down Expand Up @@ -207,6 +210,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".l1-block-bound-bypass", DefaultBatchPosterConfig.L1BlockBoundBypass, "post batches even if not within the layer 1 future bounds if we're within this margin of the max delay")
f.Bool(prefix+".use-access-lists", DefaultBatchPosterConfig.UseAccessLists, "post batches with access lists to reduce gas usage (disabled for L3s)")
f.Uint64(prefix+".gas-estimate-base-fee-multiple-bips", uint64(DefaultBatchPosterConfig.GasEstimateBaseFeeMultipleBips), "for gas estimation, use this multiple of the basefee (measured in basis points) as the max fee per gas")
f.Bool(prefix+".check-batch-correctness", DefaultBatchPosterConfig.CheckBatchCorrectness, "setting this to true will run the batch against an inbox multiplexer and verifies that it produces the correct set of messages")
redislock.AddConfigOptions(prefix+".redis-lock", f)
dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f, dataposter.DefaultDataPosterConfig)
genericconf.WalletConfigAddOptions(prefix+".parent-chain-wallet", f, DefaultBatchPosterConfig.ParentChainWallet.Pathname)
Expand Down Expand Up @@ -236,6 +240,7 @@ var DefaultBatchPosterConfig = BatchPosterConfig{
UseAccessLists: true,
RedisLock: redislock.DefaultCfg,
GasEstimateBaseFeeMultipleBips: arbmath.OneInBips * 3 / 2,
CheckBatchCorrectness: true,
}

var DefaultBatchPosterL1WalletConfig = genericconf.WalletConfig{
Expand Down Expand Up @@ -266,6 +271,7 @@ var TestBatchPosterConfig = BatchPosterConfig{
L1BlockBoundBypass: time.Hour,
UseAccessLists: true,
GasEstimateBaseFeeMultipleBips: arbmath.OneInBips * 3 / 2,
CheckBatchCorrectness: true,
}

type BatchPosterOpts struct {
Expand All @@ -280,6 +286,7 @@ type BatchPosterOpts struct {
TransactOpts *bind.TransactOpts
DAPWriter daprovider.Writer
ParentChainID *big.Int
DAPReaders []daprovider.Reader
}

func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, error) {
Expand Down Expand Up @@ -326,6 +333,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
bridgeAddr: opts.DeployInfo.Bridge,
dapWriter: opts.DAPWriter,
redisLock: redisLock,
dapReaders: opts.DAPReaders,
}
b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20)
if err != nil {
Expand Down Expand Up @@ -369,6 +377,41 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
return b, nil
}

type dummyBlobReader struct {
blobs []kzg4844.Blob
}

func (b *dummyBlobReader) GetBlobs(ctx context.Context, batchBlockHash common.Hash, versionedHashes []common.Hash) ([]kzg4844.Blob, error) {
return b.blobs, nil
}
func (b *dummyBlobReader) Initialize(ctx context.Context) error { return nil }

type testMuxBackend struct {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
batchSeqNum uint64
positionWithinMessage uint64
seqMsg []byte
allMsgs map[arbutil.MessageIndex]*arbostypes.MessageWithMetadata
delayedInboxPos int
delayedInbox []*arbostypes.MessageWithMetadata
}

func (b *testMuxBackend) PeekSequencerInbox() ([]byte, common.Hash, error) {
return b.seqMsg, common.Hash{}, nil
}

func (b *testMuxBackend) GetSequencerInboxPosition() uint64 { return b.batchSeqNum }
func (b *testMuxBackend) AdvanceSequencerInbox() {}
func (b *testMuxBackend) GetPositionWithinMessage() uint64 { return b.positionWithinMessage }
func (b *testMuxBackend) SetPositionWithinMessage(pos uint64) { b.positionWithinMessage = pos }

func (b *testMuxBackend) ReadDelayedInbox(seqNum uint64) (*arbostypes.L1IncomingMessage, error) {
if b.delayedInboxPos < len(b.delayedInbox) {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
b.delayedInboxPos++
return b.delayedInbox[b.delayedInboxPos-1].Message, nil
}
return nil, fmt.Errorf("error serving ReadDelayedInbox, all delayed messages were read. Requested delayed message position:%d, Total delayed messages: %d", b.delayedInboxPos, len(b.delayedInbox))
}

type AccessListOpts struct {
SequencerInboxAddr common.Address
BridgeAddr common.Address
Expand Down Expand Up @@ -660,6 +703,7 @@ type buildingBatch struct {
msgCount arbutil.MessageIndex
haveUsefulMessage bool
use4844 bool
muxBackend *testMuxBackend
}

func newBatchSegments(firstDelayed uint64, config *BatchPosterConfig, backlog uint64, use4844 bool) *batchSegments {
Expand Down Expand Up @@ -1099,6 +1143,12 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
startMsgCount: batchPosition.MessageCount,
use4844: use4844,
}
if b.config().CheckBatchCorrectness {
b.building.muxBackend = &testMuxBackend{
batchSeqNum: batchPosition.NextSeqNum,
allMsgs: make(map[arbutil.MessageIndex]*arbostypes.MessageWithMetadata),
}
}
}
msgCount, err := b.streamer.GetMessageCount()
if err != nil {
Expand Down Expand Up @@ -1212,6 +1262,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
)
break
}
isDelayed := msg.DelayedMessagesRead > b.building.segments.delayedMsg
success, err := b.building.segments.AddMessage(msg)
if err != nil {
// Clear our cache
Expand All @@ -1226,6 +1277,12 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
b.building.haveUsefulMessage = true
break
}
if config.CheckBatchCorrectness {
b.building.muxBackend.allMsgs[b.building.msgCount] = msg
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
if isDelayed {
b.building.muxBackend.delayedInbox = append(b.building.muxBackend.delayedInbox, msg)
}
}
if msg.Message.Header.Kind != arbostypes.L1MessageType_BatchPostingReport {
b.building.haveUsefulMessage = true
}
Expand Down Expand Up @@ -1292,6 +1349,36 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
if err != nil {
return false, err
}

if config.CheckBatchCorrectness {
dapReaders := b.dapReaders
if b.building.use4844 {
dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(&dummyBlobReader{kzgBlobs}))
}
seqMsg := binary.BigEndian.AppendUint64([]byte{}, l1BoundMinTimestamp)
seqMsg = binary.BigEndian.AppendUint64(seqMsg, l1BoundMaxTimestamp)
seqMsg = binary.BigEndian.AppendUint64(seqMsg, l1BoundMinBlockNumber)
seqMsg = binary.BigEndian.AppendUint64(seqMsg, l1BoundMaxBlockNumber)
seqMsg = binary.BigEndian.AppendUint64(seqMsg, b.building.segments.delayedMsg)
seqMsg = append(seqMsg, sequencerMsg...)
b.building.muxBackend.seqMsg = seqMsg
testMux := arbstate.NewInboxMultiplexer(b.building.muxBackend, batchPosition.DelayedMessageCount, dapReaders, daprovider.KeysetValidate)
log.Info("Begin checking the correctness of batch against inbox multiplexer", "startMsgSeqNum", batchPosition.MessageCount, "endMsgSeqNum", b.building.msgCount-1)
for i := batchPosition.MessageCount; i < b.building.msgCount; i++ {
msg, err := testMux.Pop(ctx)
if err != nil {
return false, fmt.Errorf("error getting message from inbox multiplexer (Pop) when testing correctness of batch: %w", err)
}
if msg.DelayedMessagesRead != b.building.muxBackend.allMsgs[i].DelayedMessagesRead {
return false, fmt.Errorf("when testing correctness of batch inbox multiplexer failed to produce from batch sequencerMsg, a correct delayedMessagesRead field for msg with seqNum: %d. Got: %d, Want: %d", i, msg.DelayedMessagesRead, b.building.muxBackend.allMsgs[i].DelayedMessagesRead)
}
if !msg.Message.Equals(b.building.muxBackend.allMsgs[i].Message) {
return false, fmt.Errorf("when testing correctness of batch inbox multiplexer failed to produce from batch sequencerMsg, a correct message field for msg with seqNum: %d", i)
}
}
log.Info("Successfully checked that the batch produces correct messages when ran through inbox multiplexer")
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}

tx, err := b.dataPoster.PostTransaction(ctx,
firstMsgTime,
nonce,
Expand Down
1 change: 1 addition & 0 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ func createNodeImpl(
TransactOpts: txOptsBatchPoster,
DAPWriter: dapWriter,
ParentChainID: parentChainID,
DAPReaders: dapReaders,
})
if err != nil {
return nil, err
Expand Down
Loading