Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement 4844 blob reading in replay binary #2093

Merged
merged 5 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/arbitrator-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ jobs:
make -j
make install

- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1

- name: Cache cbrotli
uses: actions/cache@v3
id: cache-cbrotli
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ jobs:
with:
targets: 'wasm32-unknown-unknown, wasm32-wasi'

- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1

- name: Cache Build Products
uses: actions/cache@v3
with:
Expand Down
6 changes: 4 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ COPY --from=brotli-library-builder /workspace/install/ /

FROM node:16-bullseye-slim as contracts-builder
RUN apt-get update && \
apt-get install -y git python3 make g++
apt-get install -y git python3 make g++ curl
RUN curl -L https://foundry.paradigm.xyz | bash && . ~/.bashrc && ~/.foundry/bin/foundryup
WORKDIR /workspace
COPY contracts/package.json contracts/yarn.lock contracts/
RUN cd contracts && yarn install
COPY contracts contracts/
COPY Makefile .
RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-solidity
RUN . ~/.bashrc && NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-solidity

FROM debian:bullseye-20211220 as wasm-base
WORKDIR /workspace
Expand Down Expand Up @@ -183,6 +184,7 @@ COPY fastcache/go.mod fastcache/go.sum fastcache/
RUN go mod download
COPY . ./
COPY --from=contracts-builder workspace/contracts/build/ contracts/build/
COPY --from=contracts-builder workspace/contracts/out/ contracts/out/
COPY --from=contracts-builder workspace/contracts/node_modules/@offchainlabs/upgrade-executor/build/contracts/src/UpgradeExecutor.sol/UpgradeExecutor.json contracts/node_modules/@offchainlabs/upgrade-executor/build/contracts/src/UpgradeExecutor.sol/
COPY --from=contracts-builder workspace/.make/ .make/
COPY --from=prover-header-export / target/
Expand Down
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil)
tracker, err := NewInboxTracker(db, streamer, nil, nil)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
14 changes: 9 additions & 5 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ type InboxTracker struct {
mutex sync.Mutex
validator *staker.BlockValidator
das arbstate.DataAvailabilityReader
blobReader arbstate.BlobReader

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader) (*InboxTracker, error) {
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader) (*InboxTracker, error) {
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil {
return nil, errors.New("data availability service required but unconfigured")
Expand All @@ -52,6 +53,7 @@ func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arb
db: db,
txStreamer: txStreamer,
das: das,
blobReader: blobReader,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
}
return tracker, nil
Expand Down Expand Up @@ -504,11 +506,12 @@ type multiplexerBackend struct {
inbox *InboxTracker
}

func (b *multiplexerBackend) PeekSequencerInbox() ([]byte, error) {
func (b *multiplexerBackend) PeekSequencerInbox() ([]byte, common.Hash, error) {
if len(b.batches) == 0 {
return nil, errors.New("read past end of specified sequencer batches")
return nil, common.Hash{}, errors.New("read past end of specified sequencer batches")
}
return b.batches[0].Serialize(b.ctx, b.client)
bytes, err := b.batches[0].Serialize(b.ctx, b.client)
return bytes, b.batches[0].BlockHash, err
}

func (b *multiplexerBackend) GetSequencerInboxPosition() uint64 {
Expand Down Expand Up @@ -603,7 +606,8 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
ctx: ctx,
client: client,
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.das, arbstate.KeysetValidate)

multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.das, t.blobReader, arbstate.KeysetValidate)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)
currentpos := prevbatchmeta.MessageCount + 1
for {
Expand Down
16 changes: 15 additions & 1 deletion arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbnode/resourcemanager"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcastclients"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/offchainlabs/nitro/solgen/go/rollupgen"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/staker/validatorwallet"
"github.com/offchainlabs/nitro/util/beaconclient"
"github.com/offchainlabs/nitro/util/contracts"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/redisutil"
Expand Down Expand Up @@ -85,6 +87,7 @@ type Config struct {
Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
BlobClient BlobClientConfig `koanf:"blob-client"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Expand Down Expand Up @@ -142,6 +145,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed
staker.L1ValidatorConfigAddOptions(prefix+".staker", f)
SeqCoordinatorConfigAddOptions(prefix+".seq-coordinator", f)
das.DataAvailabilityConfigAddNodeOptions(prefix+".data-availability", f)
BlobClientAddOptions(prefix+".blob-client", f)
SyncMonitorConfigAddOptions(prefix+".sync-monitor", f)
DangerousConfigAddOptions(prefix+".dangerous", f)
TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f)
Expand Down Expand Up @@ -512,7 +516,17 @@ func createNodeImpl(
return nil, errors.New("a data availability service is required for this chain, but it was not configured")
}

inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader)
var blobReader arbstate.BlobReader
if config.BlobClient.BeaconChainUrl != "" {
bc, err := beaconclient.NewClient(config.BlobClient.BeaconChainUrl)
if err != nil {
return nil, err
}

blobReader = NewBlobClient(bc, l1client)
}

inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader)
if err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions arbstate/das_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const L1AuthenticatedMessageHeaderFlag byte = 0x40
// ZeroheavyMessageHeaderFlag indicates that this message is zeroheavy-encoded.
const ZeroheavyMessageHeaderFlag byte = 0x20

// BlobHashesHeaderFlag indicates that this message contains EIP 4844 versioned hashes of the committments calculated over the blob data for the batch data.
const BlobHashesHeaderFlag byte = L1AuthenticatedMessageHeaderFlag | 0x10 // 0x50

// BrotliMessageHeaderByte indicates that the message is brotli-compressed.
const BrotliMessageHeaderByte byte = 0

Expand All @@ -55,6 +58,10 @@ func IsZeroheavyEncodedHeaderByte(header byte) bool {
return (ZeroheavyMessageHeaderFlag & header) > 0
}

func IsBlobHashesHeaderByte(header byte) bool {
return (BlobHashesHeaderFlag & header) > 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right if the flag has more than one 1 bit, like 0x50?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, this isn't right

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Great catch

}

func IsBrotliMessageHeaderByte(b uint8) bool {
return b == BrotliMessageHeaderByte
}
Expand Down
49 changes: 44 additions & 5 deletions arbstate/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"

Expand All @@ -21,11 +23,12 @@ import (
"github.com/offchainlabs/nitro/arbos/l1pricing"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/util/blobs"
"github.com/offchainlabs/nitro/zeroheavy"
)

type InboxBackend interface {
PeekSequencerInbox() ([]byte, error)
PeekSequencerInbox() ([]byte, common.Hash, error)

GetSequencerInboxPosition() uint64
AdvanceSequencerInbox()
Expand All @@ -36,6 +39,14 @@ type InboxBackend interface {
ReadDelayedInbox(seqNum uint64) (*arbostypes.L1IncomingMessage, error)
}

type BlobReader interface {
GetBlobs(
ctx context.Context,
batchBlockHash common.Hash,
versionedHashes []common.Hash,
) ([]kzg4844.Blob, error)
}

type sequencerMessage struct {
minTimestamp uint64
maxTimestamp uint64
Expand All @@ -50,7 +61,7 @@ const maxZeroheavyDecompressedLen = 101*MaxDecompressedLen/100 + 64
const MaxSegmentsPerSequencerMessage = 100 * 1024
const MinLifetimeSecondsForDataAvailabilityCert = 7 * 24 * 60 * 60 // one week

func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, dasReader DataAvailabilityReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) {
func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash common.Hash, data []byte, dasReader DataAvailabilityReader, blobReader BlobReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) {
if len(data) < 40 {
return nil, errors.New("sequencer message missing L1 header")
}
Expand Down Expand Up @@ -79,6 +90,31 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, da
}
}

if len(payload) > 0 && IsBlobHashesHeaderByte(payload[0]) {
blobHashes := payload[1:]
if len(blobHashes)%len(common.Hash{}) != 0 {
return nil, fmt.Errorf("blob batch data is not a list of hashes as expected")
}
versionedHashes := make([]common.Hash, len(blobHashes)/len(common.Hash{}))
for i := 0; i*32 < len(blobHashes); i += 1 {
copy(versionedHashes[i][:], blobHashes[i*32:(i+1)*32])
}

if blobReader == nil {
return nil, errors.New("blob batch payload was encountered but no BlobReader was configured")
}

kzgBlobs, err := blobReader.GetBlobs(ctx, batchBlockHash, versionedHashes)
if err != nil {
return nil, fmt.Errorf("failed to get blobs: %w", err)
}
payload, err = blobs.DecodeBlobs(kzgBlobs)
if err != nil {
log.Warn("Failed to decode blobs", "batchBlockHash", batchBlockHash, "versionedHashes", versionedHashes, "err", err)
return parsedMsg, nil
}
}

if len(payload) > 0 && IsZeroheavyEncodedHeaderByte(payload[0]) {
pl, err := io.ReadAll(io.LimitReader(zeroheavy.NewZeroheavyDecoder(bytes.NewReader(payload[1:])), int64(maxZeroheavyDecompressedLen)))
if err != nil {
Expand Down Expand Up @@ -242,6 +278,7 @@ type inboxMultiplexer struct {
backend InboxBackend
delayedMessagesRead uint64
dasReader DataAvailabilityReader
blobReader BlobReader
cachedSequencerMessage *sequencerMessage
cachedSequencerMessageNum uint64
cachedSegmentNum uint64
Expand All @@ -251,11 +288,12 @@ type inboxMultiplexer struct {
keysetValidationMode KeysetValidationMode
}

func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, dasReader DataAvailabilityReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer {
func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, dasReader DataAvailabilityReader, blobReader BlobReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer {
return &inboxMultiplexer{
backend: backend,
delayedMessagesRead: delayedMessagesRead,
dasReader: dasReader,
blobReader: blobReader,
keysetValidationMode: keysetValidationMode,
}
}
Expand All @@ -270,13 +308,14 @@ const BatchSegmentKindAdvanceL1BlockNumber uint8 = 4
// Note: this does *not* return parse errors, those are transformed into invalid messages
func (r *inboxMultiplexer) Pop(ctx context.Context) (*arbostypes.MessageWithMetadata, error) {
if r.cachedSequencerMessage == nil {
bytes, realErr := r.backend.PeekSequencerInbox()
// Note: batchBlockHash will be zero in the replay binary, but that's fine
bytes, batchBlockHash, realErr := r.backend.PeekSequencerInbox()
if realErr != nil {
return nil, realErr
}
r.cachedSequencerMessageNum = r.backend.GetSequencerInboxPosition()
var err error
r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, bytes, r.dasReader, r.keysetValidationMode)
r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, batchBlockHash, bytes, r.dasReader, r.blobReader, r.keysetValidationMode)
if err != nil {
return nil, err
}
Expand Down
9 changes: 5 additions & 4 deletions arbstate/inbox_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/offchainlabs/nitro/arbos/arbostypes"
)

Expand All @@ -19,11 +20,11 @@ type multiplexerBackend struct {
positionWithinMessage uint64
}

func (b *multiplexerBackend) PeekSequencerInbox() ([]byte, error) {
func (b *multiplexerBackend) PeekSequencerInbox() ([]byte, common.Hash, error) {
if b.batchSeqNum != 0 {
return nil, errors.New("reading unknown sequencer batch")
return nil, common.Hash{}, errors.New("reading unknown sequencer batch")
}
return b.batch, nil
return b.batch, common.Hash{}, nil
}

func (b *multiplexerBackend) GetSequencerInboxPosition() uint64 {
Expand Down Expand Up @@ -66,7 +67,7 @@ func FuzzInboxMultiplexer(f *testing.F) {
delayedMessage: delayedMsg,
positionWithinMessage: 0,
}
multiplexer := NewInboxMultiplexer(backend, 0, nil, KeysetValidate)
multiplexer := NewInboxMultiplexer(backend, 0, nil, nil, KeysetValidate)
_, err := multiplexer.Pop(context.TODO())
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pruning/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node
return nil, fmt.Errorf("failed to get finalized block: %w", err)
}
l1BlockNum := l1Block.NumberU64()
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil)
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading