diff --git a/op-e2e/actions/l2_batcher.go b/op-e2e/actions/l2_batcher.go index 7a0cb40c6aae..25fa5c802527 100644 --- a/op-e2e/actions/l2_batcher.go +++ b/op-e2e/actions/l2_batcher.go @@ -42,6 +42,10 @@ type L1TxAPI interface { SendTransaction(ctx context.Context, tx *types.Transaction) error } +type PlasmaInputSetter interface { + SetInput(ctx context.Context, img []byte) ([]byte, error) +} + type BatcherCfg struct { // Limit the size of txs MinL1TxSize uint64 @@ -53,8 +57,10 @@ type BatcherCfg struct { ForceSubmitSingularBatch bool ForceSubmitSpanBatch bool + UsePlasma bool DataAvailabilityType batcherFlags.DataAvailabilityType + PlasmaDA PlasmaInputSetter } func DefaultBatcherCfg(dp *e2eutils.DeployParams) *BatcherCfg { @@ -66,6 +72,17 @@ func DefaultBatcherCfg(dp *e2eutils.DeployParams) *BatcherCfg { } } +func PlasmaBatcherCfg(dp *e2eutils.DeployParams, plasmaDa PlasmaInputSetter) *BatcherCfg { + return &BatcherCfg{ + MinL1TxSize: 0, + MaxL1TxSize: 128_000, + BatcherKey: dp.Secrets.Batcher, + DataAvailabilityType: batcherFlags.CalldataType, + PlasmaDA: plasmaDa, + UsePlasma: true, + } +} + type L2BlockRefs interface { L2BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L2BlockRef, error) } @@ -231,6 +248,13 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing, txOpts ...func(tx *types.Dynamic t.Fatalf("failed to output channel data to frame: %v", err) } + payload := data.Bytes() + if s.l2BatcherCfg.UsePlasma { + var err error + payload, err = s.l2BatcherCfg.PlasmaDA.SetInput(context.TODO(), payload) + require.NoError(t, err, "failed to set input for plasma") + } + nonce, err := s.l1.PendingNonceAt(t.Ctx(), s.batcherAddr) require.NoError(t, err, "need batcher nonce") @@ -247,7 +271,7 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing, txOpts ...func(tx *types.Dynamic To: &s.rollupCfg.BatchInboxAddress, GasTipCap: gasTipCap, GasFeeCap: gasFeeCap, - Data: data.Bytes(), + Data: payload, } for _, opt := range txOpts { opt(rawTx) @@ -259,7 +283,7 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing, txOpts ...func(tx *types.Dynamic txData = rawTx } else if s.l2BatcherCfg.DataAvailabilityType == batcherFlags.BlobsType { var b eth.Blob - require.NoError(t, b.FromData(data.Bytes()), "must turn data into blob") + require.NoError(t, b.FromData(payload), "must turn data into blob") sidecar, blobHashes, err := txmgr.MakeSidecar([]*eth.Blob{&b}) require.NoError(t, err) require.NotNil(t, pendingHeader.ExcessBlobGas, "need L1 header with 4844 properties") diff --git a/op-e2e/actions/l2_sequencer.go b/op-e2e/actions/l2_sequencer.go index c55dcfc45ba5..c1c93cf74dec 100644 --- a/op-e2e/actions/l2_sequencer.go +++ b/op-e2e/actions/l2_sequencer.go @@ -44,8 +44,8 @@ type L2Sequencer struct { } func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc derive.L1BlobsFetcher, - eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer { - ver := NewL2Verifier(t, log, l1, blobSrc, eng, cfg, &sync.Config{}, safedb.Disabled) + plasmaSrc derive.PlasmaInputFetcher, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer { + ver := NewL2Verifier(t, log, l1, blobSrc, plasmaSrc, eng, cfg, &sync.Config{}, safedb.Disabled) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng) seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1) l1OriginSelector := &MockL1OriginSelector{ diff --git a/op-e2e/actions/l2_sequencer_test.go b/op-e2e/actions/l2_sequencer_test.go index 9c1a12db738f..1f4a766ba504 100644 --- a/op-e2e/actions/l2_sequencer_test.go +++ b/op-e2e/actions/l2_sequencer_test.go @@ -47,7 +47,7 @@ func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1M l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) require.NoError(t, err) - sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), l2Cl, sd.RollupCfg, 0) + sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), nil, l2Cl, sd.RollupCfg, 0) return miner, engine, sequencer } diff --git a/op-e2e/actions/l2_verifier.go b/op-e2e/actions/l2_verifier.go index 3c61c628c9ff..7c0af8ebf6d6 100644 --- a/op-e2e/actions/l2_verifier.go +++ b/op-e2e/actions/l2_verifier.go @@ -63,10 +63,10 @@ type safeDB interface { node.SafeDBReader } -func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier { +func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, plasmaSrc derive.PlasmaInputFetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier { metrics := &testutils.TestDerivationMetrics{} engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode) - pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, nil, eng, engine, metrics, syncCfg, safeHeadListener) + pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, engine, metrics, syncCfg, safeHeadListener) pipeline.Reset() rollupNode := &L2Verifier{ diff --git a/op-e2e/actions/l2_verifier_test.go b/op-e2e/actions/l2_verifier_test.go index 4819d4387cf8..7f2fa2348568 100644 --- a/op-e2e/actions/l2_verifier_test.go +++ b/op-e2e/actions/l2_verifier_test.go @@ -40,7 +40,7 @@ func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive jwtPath := e2eutils.WriteDefaultJWT(t) engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P()) engCl := engine.EngineClient(t, sd.RollupCfg) - verifier := NewL2Verifier(t, log, l1F, blobSrc, engCl, sd.RollupCfg, syncCfg, cfg.safeHeadListener) + verifier := NewL2Verifier(t, log, l1F, blobSrc, nil, engCl, sd.RollupCfg, syncCfg, cfg.safeHeadListener) return engine, verifier } diff --git a/op-e2e/actions/plasma_test.go b/op-e2e/actions/plasma_test.go new file mode 100644 index 000000000000..a58572af055d --- /dev/null +++ b/op-e2e/actions/plasma_test.go @@ -0,0 +1,411 @@ +package actions + +import ( + "context" + "math/big" + "math/rand" + "testing" + + "github.com/ethereum-optimism/optimism/op-bindings/bindings" + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" + "github.com/ethereum-optimism/optimism/op-node/node/safedb" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" + plasma "github.com/ethereum-optimism/optimism/op-plasma" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/sources" + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" +) + +// L2PlasmaDA is a test harness for manipulating plasma DA state. +type L2PlasmaDA struct { + log log.Logger + storage *plasma.DAErrFaker + daMgr *plasma.DA + plasmaCfg plasma.Config + contract *bindings.DataAvailabilityChallenge + batcher *L2Batcher + sequencer *L2Sequencer + engine *L2Engine + engCl *sources.EngineClient + sd *e2eutils.SetupData + dp *e2eutils.DeployParams + miner *L1Miner + alice *CrossLayerUser + lastComm []byte + lastCommBn uint64 +} + +func NewL2AltDA(log log.Logger, p *e2eutils.TestParams, t Testing) *L2PlasmaDA { + dp := e2eutils.MakeDeployParams(t, p) + sd := e2eutils.Setup(t, dp, defaultAlloc) + + require.True(t, sd.RollupCfg.UsePlasma) + + miner := NewL1Miner(t, log, sd.L1Cfg) + l1Client := miner.EthClient() + + jwtPath := e2eutils.WriteDefaultJWT(t) + engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) + engCl := engine.EngineClient(t, sd.RollupCfg) + + storage := &plasma.DAErrFaker{Client: plasma.NewMockDAClient(log)} + + l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic)) + require.NoError(t, err) + + plasmaCfg, err := sd.RollupCfg.PlasmaConfig() + require.NoError(t, err) + + daMgr := plasma.NewPlasmaDAWithStorage(log, plasmaCfg, storage, l1F, &plasma.NoopMetrics{}) + + sequencer := NewL2Sequencer(t, log, l1F, nil, daMgr, engCl, sd.RollupCfg, 0) + miner.ActL1SetFeeRecipient(common.Address{'A'}) + sequencer.ActL2PipelineFull(t) + + daMgr.OnFinalizedHeadSignal(func(ctx context.Context, ref eth.L1BlockRef) { + sequencer.derivation.Finalize(ref) + }) + + batcher := NewL2Batcher(log, sd.RollupCfg, PlasmaBatcherCfg(dp, storage), sequencer.RollupClient(), l1Client, engine.EthClient(), engCl) + + addresses := e2eutils.CollectAddresses(sd, dp) + cl := engine.EthClient() + l2UserEnv := &BasicUserEnv[*L2Bindings]{ + EthCl: cl, + Signer: types.LatestSigner(sd.L2Cfg.Config), + AddressCorpora: addresses, + Bindings: NewL2Bindings(t, cl, engine.GethClient()), + } + alice := NewCrossLayerUser(log, dp.Secrets.Alice, rand.New(rand.NewSource(0xa57b))) + alice.L2.SetUserEnv(l2UserEnv) + + contract, err := bindings.NewDataAvailabilityChallenge(sd.RollupCfg.DAChallengeAddress, l1Client) + require.NoError(t, err) + + challengeWindow, err := contract.ChallengeWindow(nil) + require.NoError(t, err) + require.Equal(t, plasmaCfg.ChallengeWindow, challengeWindow.Uint64()) + + resolveWindow, err := contract.ResolveWindow(nil) + require.NoError(t, err) + require.Equal(t, plasmaCfg.ResolveWindow, resolveWindow.Uint64()) + + return &L2PlasmaDA{ + log: log, + storage: storage, + daMgr: daMgr, + plasmaCfg: plasmaCfg, + contract: contract, + batcher: batcher, + sequencer: sequencer, + engine: engine, + engCl: engCl, + sd: sd, + dp: dp, + miner: miner, + alice: alice, + } +} + +func (a *L2PlasmaDA) StorageClient() *plasma.DAErrFaker { + return a.storage +} + +func (a *L2PlasmaDA) NewVerifier(t Testing) *L2Verifier { + jwtPath := e2eutils.WriteDefaultJWT(t) + engine := NewL2Engine(t, a.log, a.sd.L2Cfg, a.sd.RollupCfg.Genesis.L1, jwtPath) + engCl := engine.EngineClient(t, a.sd.RollupCfg) + l1F, err := sources.NewL1Client(a.miner.RPCClient(), a.log, nil, sources.L1ClientDefaultConfig(a.sd.RollupCfg, false, sources.RPCKindBasic)) + require.NoError(t, err) + + daMgr := plasma.NewPlasmaDAWithStorage(a.log, a.plasmaCfg, a.storage, l1F, &plasma.NoopMetrics{}) + + verifier := NewL2Verifier(t, a.log, l1F, nil, daMgr, engCl, a.sd.RollupCfg, &sync.Config{}, safedb.Disabled) + daMgr.OnFinalizedHeadSignal(func(ctx context.Context, ref eth.L1BlockRef) { + verifier.derivation.Finalize(ref) + }) + + return verifier +} + +func (a *L2PlasmaDA) ActSequencerIncludeTx(t Testing) { + a.alice.L2.ActResetTxOpts(t) + a.alice.L2.ActSetTxToAddr(&a.dp.Addresses.Bob)(t) + a.alice.L2.ActMakeTx(t) + + a.sequencer.ActL2PipelineFull(t) + + a.sequencer.ActL2StartBlock(t) + a.engine.ActL2IncludeTx(a.alice.Address())(t) + a.sequencer.ActL2EndBlock(t) +} + +func (a *L2PlasmaDA) ActNewL2Tx(t Testing) { + a.ActSequencerIncludeTx(t) + + a.batcher.ActL2BatchBuffer(t) + a.batcher.ActL2ChannelClose(t) + a.batcher.ActL2BatchSubmit(t, func(tx *types.DynamicFeeTx) { + a.lastComm = tx.Data + }) + + a.miner.ActL1StartBlock(3)(t) + a.miner.ActL1IncludeTx(a.dp.Addresses.Batcher)(t) + a.miner.ActL1EndBlock(t) + + a.lastCommBn = a.miner.l1Chain.CurrentBlock().Number.Uint64() +} + +func (a *L2PlasmaDA) ActDeleteLastInput(t Testing) { + a.storage.Client.DeleteData(a.lastComm) +} + +func (a *L2PlasmaDA) ActChallengeLastInput(t Testing) { + a.ActChallengeInput(t, a.lastComm, a.lastCommBn) + + a.log.Info("challenged last input", "block", a.lastCommBn) +} + +func (a *L2PlasmaDA) ActChallengeInput(t Testing, comm []byte, bn uint64) { + bondValue, err := a.contract.BondSize(&bind.CallOpts{}) + require.NoError(t, err) + + txOpts, err := bind.NewKeyedTransactorWithChainID(a.dp.Secrets.Alice, a.sd.L1Cfg.Config.ChainID) + require.NoError(t, err) + + txOpts.Value = bondValue + _, err = a.contract.Deposit(txOpts) + require.NoError(t, err) + + a.miner.ActL1StartBlock(3)(t) + a.miner.ActL1IncludeTx(a.alice.Address())(t) + a.miner.ActL1EndBlock(t) + + txOpts, err = bind.NewKeyedTransactorWithChainID(a.dp.Secrets.Alice, a.sd.L1Cfg.Config.ChainID) + require.NoError(t, err) + + _, err = a.contract.Challenge(txOpts, big.NewInt(int64(bn)), comm) + require.NoError(t, err) + + a.miner.ActL1StartBlock(3)(t) + a.miner.ActL1IncludeTx(a.alice.Address())(t) + a.miner.ActL1EndBlock(t) +} + +func (a *L2PlasmaDA) ActExpireLastInput(t Testing) { + reorgWindow := a.plasmaCfg.ResolveWindow + a.plasmaCfg.ChallengeWindow + for a.miner.l1Chain.CurrentBlock().Number.Uint64() <= a.lastCommBn+reorgWindow { + a.miner.ActL1StartBlock(3)(t) + a.miner.ActL1EndBlock(t) + } +} + +func (a *L2PlasmaDA) ActResolveLastChallenge(t Testing) { + input, err := a.storage.GetInput(t.Ctx(), a.lastComm) + require.NoError(t, err) + + txOpts, err := bind.NewKeyedTransactorWithChainID(a.dp.Secrets.Alice, a.sd.L1Cfg.Config.ChainID) + + _, err = a.contract.Resolve(txOpts, big.NewInt(int64(a.lastCommBn)), a.lastComm, input) + require.NoError(t, err) + + a.miner.ActL1StartBlock(3)(t) + a.miner.ActL1IncludeTx(a.alice.Address())(t) + a.miner.ActL1EndBlock(t) +} + +func (a *L2PlasmaDA) ActL1Blocks(t Testing, n uint64) { + for i := uint64(0); i < n; i++ { + a.miner.ActL1StartBlock(3)(t) + a.miner.ActL1EndBlock(t) + } +} + +func (a *L2PlasmaDA) GetLastTxBlock(t Testing) *types.Block { + rcpt, err := a.engine.EthClient().TransactionReceipt(t.Ctx(), a.alice.L2.lastTxHash) + require.NoError(t, err) + blk, err := a.engine.EthClient().BlockByHash(t.Ctx(), rcpt.BlockHash) + require.NoError(t, err) + return blk +} + +// Commitment is challenged but never resolved, chain reorgs when challenge window expires. +func TestPlasma_ChallengeExpired(gt *testing.T) { + t := NewDefaultTesting(gt) + + p := &e2eutils.TestParams{ + MaxSequencerDrift: 2, + SequencerWindowSize: 4, + ChannelTimeout: 4, + L1BlockTime: 3, + } + + log := testlog.Logger(t, log.LevelDebug) + + harness := NewL2AltDA(log, p, t) + + // generate enough initial l1 blocks to have a finalized head. + harness.ActL1Blocks(t, 5) + + // Include a new l2 transaction, submitting an input commitment to the l1. + harness.ActNewL2Tx(t) + + // Challenge the input commitment on the l1 challenge contract. + harness.ActChallengeLastInput(t) + + blk := harness.GetLastTxBlock(t) + + // catch up the sequencer derivation pipeline with the new l1 blocks. + harness.sequencer.ActL2PipelineFull(t) + + // create enough l1 blocks to expire the resolve window. + harness.ActExpireLastInput(t) + + // catch up the sequencer derivation pipeline with the new l1 blocks. + harness.sequencer.ActL2PipelineFull(t) + + // make sure that the finalized head was correctly updated on the engine. + l2Finalized, err := harness.engCl.L2BlockRefByLabel(t.Ctx(), eth.Finalized) + require.NoError(t, err) + require.Equal(t, uint64(8), l2Finalized.Number) + + newBlk, err := harness.engine.EthClient().BlockByNumber(t.Ctx(), blk.Number()) + require.NoError(t, err) + + // reorg happened even though data was available + require.NotEqual(t, blk.Hash(), newBlk.Hash()) + + // now delete the data from the storage service so it is not available at all + // to the verifier derivation pipeline. + harness.ActDeleteLastInput(t) + + syncStatus := harness.sequencer.SyncStatus() + + // verifier is able to sync with expired missing data + verifier := harness.NewVerifier(t) + verifier.ActL2PipelineFull(t) + + verifSyncStatus := verifier.SyncStatus() + + require.Equal(t, syncStatus.FinalizedL2, verifSyncStatus.FinalizedL2) +} + +// Commitment is challenged after sequencer derived the chain but data disappears. A verifier +// derivation pipeline stalls until the challenge is resolved and then resumes with data from the contract. +func TestPlasma_ChallengeResolved(gt *testing.T) { + t := NewDefaultTesting(gt) + p := &e2eutils.TestParams{ + MaxSequencerDrift: 2, + SequencerWindowSize: 4, + ChannelTimeout: 4, + L1BlockTime: 3, + } + log := testlog.Logger(t, log.LvlDebug) + harness := NewL2AltDA(log, p, t) + + // include a new l2 transaction, submitting an input commitment to the l1. + harness.ActNewL2Tx(t) + + // generate 3 l1 blocks. + harness.ActL1Blocks(t, 3) + + // challenge the input commitment for that l2 transaction on the l1 challenge contract. + harness.ActChallengeLastInput(t) + + // catch up sequencer derivatio pipeline. + // this syncs the latest event within the AltDA manager. + harness.sequencer.ActL2PipelineFull(t) + + // resolve the challenge on the l1 challenge contract. + harness.ActResolveLastChallenge(t) + + // catch up the sequencer derivation pipeline with the new l1 blocks. + // this syncs the resolved status and input data within the AltDA manager. + harness.sequencer.ActL2PipelineFull(t) + + // delete the data from the storage service so it is not available at all + // to the verifier derivation pipeline. + harness.ActDeleteLastInput(t) + + syncStatus := harness.sequencer.SyncStatus() + + // new verifier is able to sync and resolve the input from calldata + verifier := harness.NewVerifier(t) + verifier.ActL2PipelineFull(t) + + verifSyncStatus := verifier.SyncStatus() + + require.Equal(t, syncStatus.SafeL2, verifSyncStatus.SafeL2) +} + +// DA storage service goes offline while sequencer keeps making blocks. When storage comes back online, it should be able to catch up. +func TestAltDA_StorageError(gt *testing.T) { + t := NewDefaultTesting(gt) + p := &e2eutils.TestParams{ + MaxSequencerDrift: 2, + SequencerWindowSize: 4, + ChannelTimeout: 4, + L1BlockTime: 3, + } + log := testlog.Logger(t, log.LvlDebug) + harness := NewL2AltDA(log, p, t) + + // include a new l2 transaction, submitting an input commitment to the l1. + harness.ActNewL2Tx(t) + + txBlk := harness.GetLastTxBlock(t) + + // mock a storage client error when trying to get the pre-image. + // this simulates the storage service going offline for example. + harness.storage.ActGetPreImageFail() + + // try to derive the l2 chain from the submitted inputs commitments. + // the storage call will fail the first time then succeed. + harness.sequencer.ActL2PipelineFull(t) + + // sequencer derivation was able to sync to latest l1 origin + syncStatus := harness.sequencer.SyncStatus() + require.Equal(t, uint64(1), syncStatus.SafeL2.Number) + require.Equal(t, txBlk.Hash(), syncStatus.SafeL2.Hash) +} + +// Commitment is challenged but with a wrong block number. +func TestAltDA_ChallengeBadBlockNumber(gt *testing.T) { + t := NewDefaultTesting(gt) + p := &e2eutils.TestParams{ + MaxSequencerDrift: 2, + SequencerWindowSize: 4, + ChannelTimeout: 4, + L1BlockTime: 3, + } + log := testlog.Logger(t, log.LvlDebug) + harness := NewL2AltDA(log, p, t) + + // generate 3 blocks of l1 chain + harness.ActL1Blocks(t, 3) + + // include a new transaction on l2 + harness.ActNewL2Tx(t) + + // move the l1 chain so the challenge window expires + harness.ActExpireLastInput(t) + + // catch up derivation + harness.sequencer.ActL2PipelineFull(t) + + // challenge the input but with a wrong block number + // in the current challenge window + harness.ActChallengeInput(t, harness.lastComm, 14) + + // catch up derivation + harness.sequencer.ActL2PipelineFull(t) + + // da mgr should not have save the challenge + found := harness.daMgr.State().IsTracking(harness.lastComm, 14) + require.False(t, found) +} diff --git a/op-e2e/actions/reorg_test.go b/op-e2e/actions/reorg_test.go index 262ca842c1d3..f1fa7a67d952 100644 --- a/op-e2e/actions/reorg_test.go +++ b/op-e2e/actions/reorg_test.go @@ -617,7 +617,7 @@ func RestartOpGeth(gt *testing.T, deltaTimeOffset *hexutil.Uint64) { engRpc := &rpcWrapper{seqEng.RPCClient()} l2Cl, err := sources.NewEngineClient(engRpc, log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) require.NoError(t, err) - sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), l2Cl, sd.RollupCfg, 0) + sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), nil, l2Cl, sd.RollupCfg, 0) batcher := NewL2Batcher(log, sd.RollupCfg, DefaultBatcherCfg(dp), sequencer.RollupClient(), miner.EthClient(), seqEng.EthClient(), seqEng.EngineClient(t, sd.RollupCfg)) @@ -705,7 +705,7 @@ func ConflictingL2Blocks(gt *testing.T, deltaTimeOffset *hexutil.Uint64) { require.NoError(t, err) l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindStandard)) require.NoError(t, err) - altSequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), altSeqEngCl, sd.RollupCfg, 0) + altSequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), nil, altSeqEngCl, sd.RollupCfg, 0) altBatcher := NewL2Batcher(log, sd.RollupCfg, DefaultBatcherCfg(dp), altSequencer.RollupClient(), miner.EthClient(), altSeqEng.EthClient(), altSeqEng.EngineClient(t, sd.RollupCfg)) diff --git a/op-e2e/e2eutils/setup.go b/op-e2e/e2eutils/setup.go index b7e02ddd1feb..32ade2a84390 100644 --- a/op-e2e/e2eutils/setup.go +++ b/op-e2e/e2eutils/setup.go @@ -161,6 +161,10 @@ func Setup(t require.TestingT, deployParams *DeployParams, alloc *AllocParams) * EcotoneTime: deployConf.EcotoneTime(uint64(deployConf.L1GenesisBlockTimestamp)), FjordTime: deployConf.FjordTime(uint64(deployConf.L1GenesisBlockTimestamp)), InteropTime: deployConf.InteropTime(uint64(deployConf.L1GenesisBlockTimestamp)), + DAChallengeAddress: l1Deployments.DataAvailabilityChallengeProxy, + DAChallengeWindow: deployConf.DaChallengeWindow, + DAResolveWindow: deployConf.DaResolveWindow, + UsePlasma: deployConf.UsePlasma, } require.NoError(t, rollupCfg.Check()) diff --git a/op-node/metrics/metrics.go b/op-node/metrics/metrics.go index 999f1b3edae4..81a16e462585 100644 --- a/op-node/metrics/metrics.go +++ b/op-node/metrics/metrics.go @@ -10,6 +10,8 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum-optimism/optimism/op-node/p2p/store" + + // plasma "github.com/ethereum-optimism/optimism/op-plasma" ophttp "github.com/ethereum-optimism/optimism/op-service/httputil" "github.com/ethereum-optimism/optimism/op-service/metrics" @@ -122,6 +124,8 @@ type Metrics struct { TransactionsSequencedTotal prometheus.Counter + // PlasmaMetrics plasma.Metricer + // Channel Bank Metrics headChannelOpenedEvent *metrics.Event channelTimedOutEvent *metrics.Event @@ -384,6 +388,8 @@ func NewMetrics(procName string) *Metrics { "required", }), + // PlasmaMetrics: plasma.MakeMetrics(ns, factory), + registry: registry, factory: factory, } diff --git a/op-node/node/node.go b/op-node/node/node.go index 7f8fad5adddc..dacea23e0f8f 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -203,8 +203,10 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error { // which only change once per epoch at most and may be delayed. n.l1SafeSub = eth.PollBlockChanges(n.log, n.l1Source, n.OnNewL1Safe, eth.Safe, cfg.L1EpochPollInterval, time.Second*10) - n.l1FinalizedSub = eth.PollBlockChanges(n.log, n.l1Source, n.OnNewL1Finalized, eth.Finalized, - cfg.L1EpochPollInterval, time.Second*10) + if !cfg.Plasma.Enabled { + n.l1FinalizedSub = eth.PollBlockChanges(n.log, n.l1Source, n.OnNewL1Finalized, eth.Finalized, + cfg.L1EpochPollInterval, time.Second*10) + } return nil } @@ -386,9 +388,17 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger sequencerConductor = NewConductorClient(cfg, n.log, n.metrics) } - plasmaDA := plasma.NewPlasmaDA(n.log, cfg.Plasma) + // if plasma is not explicitly activated in the node CLI, the config + any error will be ignored. + rpCfg, err := cfg.Rollup.PlasmaConfig() + if cfg.Plasma.Enabled && err != nil { + return fmt.Errorf("failed to get plasma config: %w", err) + } + plasmaDA := plasma.NewPlasmaDA(n.log, cfg.Plasma, rpCfg, n.l1Source, &plasma.NoopMetrics{}) if cfg.Plasma.Enabled { n.log.Info("Plasma DA enabled", "da_server", cfg.Plasma.DAServerURL) + // Plasma takes control of the engine finalization signal callback only when enabled + // on the CLI. + plasmaDA.OnFinalizedHeadSignal(n.OnNewL1Finalized) } if cfg.SafeDBPath != "" { n.log.Info("Safe head database enabled", "path", cfg.SafeDBPath) diff --git a/op-node/rollup/derive/data_source.go b/op-node/rollup/derive/data_source.go index 705601306c14..2d232e34676f 100644 --- a/op-node/rollup/derive/data_source.go +++ b/op-node/rollup/derive/data_source.go @@ -9,7 +9,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum-optimism/optimism/op-node/rollup" - plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -28,7 +27,9 @@ type L1BlobsFetcher interface { type PlasmaInputFetcher interface { // GetInput fetches the input for the given commitment at the given block number from the DA storage service. - GetInput(ctx context.Context, commitment []byte, blockNumber uint64) (plasma.Input, error) + GetInput(ctx context.Context, commitment []byte, blockId eth.BlockID) (eth.Data, error) + // AdvanceL1Origin advances the L1 origin to the given block number, syncing the DA challenge events. + AdvanceL1Origin(ctx context.Context, blockId eth.BlockID) error } // DataSourceFactory reads raw transactions from a given block & then filters for diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index b99a7914df6d..4644424bce0a 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -179,7 +179,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engin ec: engine, engine: l2Source, metrics: metrics, - finalityData: make([]FinalityData, 0, finalityLookback), + finalityData: make([]FinalityData, 0, cfg.FinalityLookback()), unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize), prev: prev, l1Fetcher: l1Fetcher, @@ -405,8 +405,8 @@ func (eq *EngineQueue) postProcessSafeL2() error { return err } // prune finality data if necessary - if len(eq.finalityData) >= finalityLookback { - eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...) + if uint64(len(eq.finalityData)) >= eq.cfg.FinalityLookback() { + eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:eq.cfg.FinalityLookback()]...) } // remember the last L2 block that we fully derived from the given finality data if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.origin.Number { diff --git a/op-node/rollup/derive/plasma_data_source.go b/op-node/rollup/derive/plasma_data_source.go index 9c4dec938110..15a7648cf1ee 100644 --- a/op-node/rollup/derive/plasma_data_source.go +++ b/op-node/rollup/derive/plasma_data_source.go @@ -2,8 +2,10 @@ package derive import ( "context" + "errors" "fmt" + plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum/go-ethereum/log" ) @@ -29,6 +31,17 @@ func NewPlasmaDataSource(log log.Logger, src DataIter, fetcher PlasmaInputFetche } func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) { + // Process origin syncs the challenge contract events and updates the local challenge states + // before we can proceed to fetch the input data. This function can be called multiple times + // for the same origin and noop if the origin was already processed. It is also called if + // there is not commitment in the current origin. + if err := s.fetcher.AdvanceL1Origin(ctx, s.id); err != nil { + if errors.Is(err, plasma.ErrReorgRequired) { + return nil, NewResetError(fmt.Errorf("new expired challenge")) + } + return nil, NewTemporaryError(fmt.Errorf("failed to advance plasma L1 origin: %w", err)) + } + if s.comm == nil { var err error // the l1 source returns the input commitment for the batch. @@ -38,12 +51,25 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) { } } // use the commitment to fetch the input from the plasma DA provider. - resp, err := s.fetcher.GetInput(ctx, s.comm, s.id.Number) - if err != nil { + data, err := s.fetcher.GetInput(ctx, s.comm, s.id) + // GetInput may call for a reorg if the pipeline is stalled and the plasma DA manager + // continued syncing origins detached from the pipeline origin. + if errors.Is(err, plasma.ErrReorgRequired) { + // challenge for a new previously derived commitment expired. + return nil, NewResetError(err) + } else if errors.Is(err, plasma.ErrExpiredChallenge) { + // this commitment was challenged and the challenge expired. + s.log.Warn("challenge expired, skipping batch", "comm", fmt.Sprintf("%x", s.comm)) + s.comm = nil + // skip the input + return s.Next(ctx) + } else if errors.Is(err, plasma.ErrMissingPastWindow) { + return nil, NewCriticalError(fmt.Errorf("data for comm %x not available: %w", s.comm, err)) + } else if err != nil { // return temporary error so we can keep retrying. return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %x from da service: %w", s.comm, err)) } // reset the commitment so we can fetch the next one from the source at the next iteration. s.comm = nil - return resp.Data, nil + return data, nil } diff --git a/op-node/rollup/derive/plasma_data_source_test.go b/op-node/rollup/derive/plasma_data_source_test.go index dc96a4390b72..4db21f9a86eb 100644 --- a/op-node/rollup/derive/plasma_data_source_test.go +++ b/op-node/rollup/derive/plasma_data_source_test.go @@ -12,17 +12,35 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testutils" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) +type MockFinalitySignal struct { + mock.Mock +} + +func (m *MockFinalitySignal) OnNewL1Finalized(ctx context.Context, blockRef eth.L1BlockRef) { + m.MethodCalled("OnNewL1Finalized", blockRef) +} + +func (m *MockFinalitySignal) ExpectL1Finalized(blockRef eth.L1BlockRef) { + m.On("OnNewL1Finalized", blockRef).Once() +} + // TestPlasmaDataSource verifies that commitments are correctly read from l1 and then // forwarded to the Plasma DA to return the correct inputs in the iterator. +// First it generates some L1 refs containing a random number of commitments, challenges +// the first 4 commitments then generates enough blocks to expire the challenge. +// Then it simulates rederiving while verifying it does skip the expired input until the next +// challenge expires. func TestPlasmaDataSource(t *testing.T) { logger := testlog.Logger(t, log.LevelDebug) ctx := context.Background() @@ -33,7 +51,14 @@ func TestPlasmaDataSource(t *testing.T) { storage := plasma.NewMockDAClient(logger) - da := plasma.NewPlasmaDAWithStorage(logger, storage) + pcfg := plasma.Config{ + ChallengeWindow: 90, ResolveWindow: 90, + } + + da := plasma.NewPlasmaDAWithStorage(logger, pcfg, storage, l1F, &plasma.NoopMetrics{}) + + finalitySignal := &MockFinalitySignal{} + da.OnFinalizedHeadSignal(finalitySignal.OnNewL1Finalized) // Create rollup genesis and config l1Time := uint64(2) @@ -64,12 +89,15 @@ func TestPlasmaDataSource(t *testing.T) { } // keep track of random input data to validate against var inputs [][]byte + var comms [][]byte signer := cfg.L1Signer() factory := NewDataSourceFactory(logger, cfg, l1F, nil, da) - for i := uint64(0); i <= 18; i++ { + nc := 0 + + for i := uint64(0); i <= pcfg.ChallengeWindow+pcfg.ResolveWindow; i++ { parent := l1Refs[len(l1Refs)-1] // create a new mock l1 ref ref := eth.L1BlockRef{ @@ -80,6 +108,8 @@ func TestPlasmaDataSource(t *testing.T) { } l1Refs = append(l1Refs, ref) logger.Info("new l1 block", "ref", ref) + // called for each l1 block to sync challenges + l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil) // pick a random number of commitments to include in the l1 block c := rng.Intn(4) @@ -90,6 +120,7 @@ func TestPlasmaDataSource(t *testing.T) { input := testutils.RandomData(rng, 2000) comm, _ := storage.SetInput(ctx, input) inputs = append(inputs, input) + comms = append(comms, comm) tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{ ChainID: signer.ChainID(), @@ -104,13 +135,39 @@ func TestPlasmaDataSource(t *testing.T) { require.NoError(t, err) txs = append(txs, tx) + } logger.Info("included commitments", "count", c) l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil) + // called once per derivation + l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil) + + if ref.Number == 2 { + l1F.ExpectL1BlockRefByNumber(ref.Number, ref, nil) + finalitySignal.ExpectL1Finalized(ref) + } + + // challenge the first 4 commitments as soon as we have collected them all + if len(comms) >= 4 && nc < 7 { + // skip a block between each challenge transaction + if nc%2 == 0 { + da.State().SetActiveChallenge(comms[nc/2], ref.Number, pcfg.ResolveWindow) + logger.Info("setting active challenge", "comm", comms[nc/2]) + } + nc++ + } // create a new data source for each block src, err := factory.OpenData(ctx, ref, batcherAddr) require.NoError(t, err) + + // first challenge expires + if i == 95 { + _, err := src.Next(ctx) + require.ErrorIs(t, err, ErrReset) + break + } + for j := 0; j < c; j++ { data, err := src.Next(ctx) // check that each commitment is resolved @@ -121,4 +178,194 @@ func TestPlasmaDataSource(t *testing.T) { _, err = src.Next(ctx) require.ErrorIs(t, err, io.EOF) } + + logger.Info("pipeline reset ..................................") + + // start at 1 since first input should be skipped + nc = 1 + + for i := 1; i <= len(l1Refs)+2; i++ { + + var ref eth.L1BlockRef + // first we run through all the existing l1 blocks + if i < len(l1Refs) { + ref = l1Refs[i] + logger.Info("re deriving block", "ref", ref, "i", i) + + if i == len(l1Refs)-1 { + l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil) + } + // once past the l1 head, continue generating new l1 refs + } else { + parent := l1Refs[len(l1Refs)-1] + // create a new mock l1 ref + ref = eth.L1BlockRef{ + Hash: testutils.RandomHash(rng), + Number: parent.Number + 1, + ParentHash: parent.Hash, + Time: parent.Time + l1Time, + } + l1Refs = append(l1Refs, ref) + logger.Info("new l1 block", "ref", ref) + // called for each l1 block to sync challenges + l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil) + + // pick a random number of commitments to include in the l1 block + c := rng.Intn(4) + var txs []*types.Transaction + + for j := 0; j < c; j++ { + // mock input commitments in l1 transactions + input := testutils.RandomData(rng, 2000) + comm, _ := storage.SetInput(ctx, input) + inputs = append(inputs, input) + comms = append(comms, comm) + + tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{ + ChainID: signer.ChainID(), + Nonce: 0, + GasTipCap: big.NewInt(2 * params.GWei), + GasFeeCap: big.NewInt(30 * params.GWei), + Gas: 100_000, + To: &batcherInbox, + Value: big.NewInt(int64(0)), + Data: comm, + }) + require.NoError(t, err) + + txs = append(txs, tx) + + } + logger.Info("included commitments", "count", c) + l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil) + } + + // create a new data source for each block + src, err := factory.OpenData(ctx, ref, batcherAddr) + require.NoError(t, err) + + // next challenge expires + if i == 98 { + _, err := src.Next(ctx) + require.ErrorIs(t, err, ErrReset) + break + } + + for data, err := src.Next(ctx); err != io.EOF; data, err = src.Next(ctx) { + logger.Info("yielding data") + // check that each commitment is resolved + require.NoError(t, err) + require.Equal(t, hexutil.Bytes(inputs[nc]), data) + + nc++ + } + + } + finalitySignal.AssertExpectations(t) + l1F.AssertExpectations(t) +} + +// This tests makes sure the pipeline returns a temporary error if data is not found. +func TestPlasmaDataSourceStall(t *testing.T) { + logger := testlog.Logger(t, log.LevelDebug) + ctx := context.Background() + + rng := rand.New(rand.NewSource(1234)) + + l1F := &testutils.MockL1Source{} + + storage := plasma.NewMockDAClient(logger) + + pcfg := plasma.Config{ + ChallengeWindow: 90, ResolveWindow: 90, + } + + da := plasma.NewPlasmaDAWithStorage(logger, pcfg, storage, l1F, &plasma.NoopMetrics{}) + + finalitySignal := &MockFinalitySignal{} + da.OnFinalizedHeadSignal(finalitySignal.OnNewL1Finalized) + + // Create rollup genesis and config + l1Time := uint64(2) + refA := testutils.RandomBlockRef(rng) + refA.Number = 1 + l1Refs := []eth.L1BlockRef{refA} + refA0 := eth.L2BlockRef{ + Hash: testutils.RandomHash(rng), + Number: 0, + ParentHash: common.Hash{}, + Time: refA.Time, + L1Origin: refA.ID(), + SequenceNumber: 0, + } + batcherPriv := testutils.RandomKey() + batcherAddr := crypto.PubkeyToAddress(batcherPriv.PublicKey) + batcherInbox := common.Address{42} + cfg := &rollup.Config{ + Genesis: rollup.Genesis{ + L1: refA.ID(), + L2: refA0.ID(), + L2Time: refA0.Time, + }, + BlockTime: 1, + SeqWindowSize: 20, + BatchInboxAddress: batcherInbox, + DAChallengeAddress: common.Address{43}, + } + + signer := cfg.L1Signer() + + factory := NewDataSourceFactory(logger, cfg, l1F, nil, da) + + parent := l1Refs[0] + // create a new mock l1 ref + ref := eth.L1BlockRef{ + Hash: testutils.RandomHash(rng), + Number: parent.Number + 1, + ParentHash: parent.Hash, + Time: parent.Time + l1Time, + } + l1F.ExpectFetchReceipts(ref.Hash, nil, types.Receipts{}, nil) + // mock input commitments in l1 transactions + input := testutils.RandomData(rng, 2000) + comm, _ := storage.SetInput(ctx, input) + + tx, err := types.SignNewTx(batcherPriv, signer, &types.DynamicFeeTx{ + ChainID: signer.ChainID(), + Nonce: 0, + GasTipCap: big.NewInt(2 * params.GWei), + GasFeeCap: big.NewInt(30 * params.GWei), + Gas: 100_000, + To: &batcherInbox, + Value: big.NewInt(int64(0)), + Data: comm, + }) + require.NoError(t, err) + + txs := []*types.Transaction{tx} + + l1F.ExpectInfoAndTxsByHash(ref.Hash, testutils.RandomBlockInfo(rng), txs, nil) + + // delete the input from the DA provider so it returns not found + storage.DeleteData(comm) + + // next block is fetched to look ahead challenges but is not yet available + l1F.ExpectL1BlockRefByNumber(ref.Number+1, eth.L1BlockRef{}, ethereum.NotFound) + + src, err := factory.OpenData(ctx, ref, batcherAddr) + require.NoError(t, err) + + // data is not found so we return a temporary error + _, err = src.Next(ctx) + require.ErrorIs(t, err, ErrTemporary) + + // now challenge is resolved + da.State().SetResolvedChallenge(comm, input, ref.Number+2) + + // derivation can resume + data, err := src.Next(ctx) + require.NoError(t, err) + require.Equal(t, hexutil.Bytes(input), data) + + l1F.AssertExpectations(t) } diff --git a/op-node/rollup/sync/start.go b/op-node/rollup/sync/start.go index 5023ed83c53e..3a3fbcca8159 100644 --- a/op-node/rollup/sync/start.go +++ b/op-node/rollup/sync/start.go @@ -176,7 +176,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain if result.Unsafe == (eth.L2BlockRef{}) { result.Unsafe = n // Check we are not reorging L2 incredibly deep - if n.L1Origin.Number+(MaxReorgSeqWindows*cfg.SeqWindowSize) < prevUnsafe.L1Origin.Number { + if n.L1Origin.Number+(MaxReorgSeqWindows*cfg.ReorgWindowSize()) < prevUnsafe.L1Origin.Number { // If the reorg depth is too large, something is fishy. // This can legitimately happen if L1 goes down for a while. But in that case, // restarting the L2 node with a bigger configured MaxReorgDepth is an acceptable @@ -201,7 +201,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain } // If the L2 block is at least as old as the previous safe head, and we have seen at least a full sequence window worth of L1 blocks to confirm - if n.Number <= result.Safe.Number && n.L1Origin.Number+cfg.SeqWindowSize < highestL2WithCanonicalL1Origin.L1Origin.Number && n.SequenceNumber == 0 { + if n.Number <= result.Safe.Number && n.L1Origin.Number+cfg.ReorgWindowSize() < highestL2WithCanonicalL1Origin.L1Origin.Number && n.SequenceNumber == 0 { ready = true } diff --git a/op-node/rollup/types.go b/op-node/rollup/types.go index 84aa03712859..24544f366f09 100644 --- a/op-node/rollup/types.go +++ b/op-node/rollup/types.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + plasma "github.com/ethereum-optimism/optimism/op-plasma" "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -113,6 +114,17 @@ type Config struct { // L1 DataAvailabilityChallenge contract proxy address DAChallengeAddress common.Address `json:"da_challenge_address,omitempty"` + + // DA challenge window value set on the DAC contract. Used in plasma mode + // to compute when a commitment can no longer be challenged. + DAChallengeWindow uint64 `json:"da_challenge_window"` + + // DA resolve window value set on the DAC contract. Used in plasma mode + // to compute when a challenge expires and trigger a reorg if needed. + DAResolveWindow uint64 `json:"da_resolve_window"` + + // UsePlasma is activated when the chain is in plasma mode. + UsePlasma bool `json:"use_plasma"` } // ValidateL1Config checks L1 config variables for errors. @@ -401,6 +413,40 @@ func (c *Config) IsPlasmaEnabled() bool { return c.DAChallengeAddress != (common.Address{}) } +// PlasmaConfig validates and returns the plasma config from the rollup config. +func (c *Config) PlasmaConfig() (plasma.Config, error) { + if c.DAChallengeAddress == (common.Address{}) { + return plasma.Config{}, fmt.Errorf("missing DAChallengeAddress") + } + if c.DAChallengeWindow == uint64(0) { + return plasma.Config{}, fmt.Errorf("missing DAChallengeWindow") + } + if c.DAResolveWindow == uint64(0) { + return plasma.Config{}, fmt.Errorf("missing DAResolveWindow") + } + return plasma.Config{ + DAChallengeContractAddress: c.DAChallengeAddress, + ChallengeWindow: c.DAChallengeWindow, + ResolveWindow: c.DAResolveWindow, + }, nil +} + +// FinalityLookback computes the number of l2 blocks to keep track of for finality. +func (c *Config) FinalityLookback() uint64 { + if c.UsePlasma { + return c.DAChallengeWindow + c.DAResolveWindow + } + return 4*32 + 1 +} + +// ReorgWindowSize computes the max number of blocks to walk back in case of reset. +func (c *Config) ReorgWindowSize() uint64 { + if c.UsePlasma { + return c.DAChallengeWindow + c.DAResolveWindow + c.SeqWindowSize + } + return c.SeqWindowSize +} + // Description outputs a banner describing the important parts of rollup configuration in a human-readable form. // Optionally provide a mapping of L2 chain IDs to network names to label the L2 chain with if not unknown. // The config should be config.Check()-ed before creating a description. diff --git a/op-plasma/commitment.go b/op-plasma/commitment.go new file mode 100644 index 000000000000..388eee00eeba --- /dev/null +++ b/op-plasma/commitment.go @@ -0,0 +1,56 @@ +package plasma + +import ( + "bytes" + "errors" + + "github.com/ethereum/go-ethereum/crypto" +) + +// ErrInvalidCommitment is returned when the commitment cannot be parsed into a known commitment type. +var ErrInvalidCommitment = errors.New("invalid commitment") + +// ErrCommitmentMismatch is returned when the commitment does not match the given input. +var ErrCommitmentMismatch = errors.New("commitment mismatch") + +// CommitmentType is the commitment type prefix. +type CommitmentType byte + +// KeccakCommitmentType is the default commitment type for the DA storage. +const Keccak256CommitmentType CommitmentType = 0 + +// Keccak256Commitment is the default commitment type for op-plasma. +type Keccak256Commitment []byte + +// Encode adds a commitment type prefix self describing the commitment. +func (c Keccak256Commitment) Encode() []byte { + return append([]byte{byte(Keccak256CommitmentType)}, c...) +} + +// Verify checks if the commitment matches the given input. +func (c Keccak256Commitment) Verify(input []byte) error { + if !bytes.Equal(c, crypto.Keccak256(input)) { + return ErrCommitmentMismatch + } + return nil +} + +// Keccak256 creates a new commitment from the given input. +func Keccak256(input []byte) Keccak256Commitment { + return Keccak256Commitment(crypto.Keccak256(input)) +} + +// DecodeKeccak256 validates and casts the commitment into a Keccak256Commitment. +func DecodeKeccak256(commitment []byte) (Keccak256Commitment, error) { + if commitment == nil || len(commitment) == 0 { + return nil, ErrInvalidCommitment + } + if commitment[0] != byte(Keccak256CommitmentType) { + return nil, ErrInvalidCommitment + } + c := commitment[1:] + if len(c) != 32 { + return nil, ErrInvalidCommitment + } + return c, nil +} diff --git a/op-plasma/daclient.go b/op-plasma/daclient.go index 80fb5b2cda1b..3f12048e4933 100644 --- a/op-plasma/daclient.go +++ b/op-plasma/daclient.go @@ -7,16 +7,11 @@ import ( "fmt" "io" "net/http" - - "github.com/ethereum/go-ethereum/crypto" ) // ErrNotFound is returned when the server could not find the input. var ErrNotFound = errors.New("not found") -// ErrCommitmentMismatch is returned when the server returns the wrong input for the given commitment. -var ErrCommitmentMismatch = errors.New("commitment mismatch") - // ErrInvalidInput is returned when the input is not valid for posting to the DA storage. var ErrInvalidInput = errors.New("invalid input") @@ -34,8 +29,12 @@ func NewDAClient(url string, verify bool) *DAClient { return &DAClient{url, verify} } -// GetInput returns the input data for the given commitment bytes. +// GetInput returns the input data for the given encoded commitment bytes. func (c *DAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) { + comm, err := DecodeKeccak256(key) + if err != nil { + return nil, err + } req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/get/0x%x", c.url, key), nil) if err != nil { return nil, fmt.Errorf("failed to create HTTP request: %w", err) @@ -53,10 +52,10 @@ func (c *DAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) { return nil, err } if c.verify { - exp := crypto.Keccak256(input) - if !bytes.Equal(exp, key) { - return nil, ErrCommitmentMismatch + if err := comm.Verify(input); err != nil { + return nil, err } + } return input, nil } @@ -66,7 +65,7 @@ func (c *DAClient) SetInput(ctx context.Context, img []byte) ([]byte, error) { if len(img) == 0 { return nil, ErrInvalidInput } - key := crypto.Keccak256(img) + key := Keccak256(img).Encode() body := bytes.NewReader(img) url := fmt.Sprintf("%s/put/0x%x", c.url, key) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body) diff --git a/op-plasma/daclient_test.go b/op-plasma/daclient_test.go index c2afdfe38072..576a07d8af9f 100644 --- a/op-plasma/daclient_test.go +++ b/op-plasma/daclient_test.go @@ -9,9 +9,7 @@ import ( "testing" "github.com/ethereum-optimism/optimism/op-service/testlog" - "github.com/ethereum-optimism/optimism/op-service/testutils" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/require" @@ -80,18 +78,23 @@ func TestDAClient(t *testing.T) { rng := rand.New(rand.NewSource(1234)) - input := testutils.RandomData(rng, 2000) + input := RandomData(rng, 2000) comm, err := client.SetInput(ctx, input) require.NoError(t, err) - require.Equal(t, comm, crypto.Keccak256(input)) + require.Equal(t, comm, Keccak256(input).Encode()) stored, err := client.GetInput(ctx, comm) require.NoError(t, err) require.Equal(t, input, stored) + // test validate commitment + badcomm := RandomData(rng, 32) + _, err = client.GetInput(ctx, badcomm) + require.ErrorIs(t, err, ErrInvalidCommitment) + // set a bad commitment in the store require.NoError(t, store.Put(comm, []byte("bad data"))) @@ -99,7 +102,7 @@ func TestDAClient(t *testing.T) { require.ErrorIs(t, err, ErrCommitmentMismatch) // test not found error - comm = crypto.Keccak256(testutils.RandomData(rng, 32)) + comm = Keccak256(RandomData(rng, 32)).Encode() _, err = client.GetInput(ctx, comm) require.ErrorIs(t, err, ErrNotFound) @@ -112,6 +115,6 @@ func TestDAClient(t *testing.T) { _, err = client.SetInput(ctx, input) require.Error(t, err) - _, err = client.GetInput(ctx, crypto.Keccak256(input)) + _, err = client.GetInput(ctx, Keccak256(input).Encode()) require.Error(t, err) } diff --git a/op-plasma/damgr.go b/op-plasma/damgr.go index f206678b67be..025b451b1e91 100644 --- a/op-plasma/damgr.go +++ b/op-plasma/damgr.go @@ -2,48 +2,343 @@ package plasma import ( "context" + "errors" + "fmt" + "io" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum-optimism/optimism/op-bindings/bindings" "github.com/ethereum-optimism/optimism/op-service/eth" ) +// ErrPendingChallenge is return when data is not available but can still be challenged/resolved +// so derivation should halt temporarily. +var ErrPendingChallenge = errors.New("not found, pending challenge") + +// ErrExpiredChallenge is returned when a challenge was not resolved and derivation should skip this input. +var ErrExpiredChallenge = errors.New("challenge expired") + +// ErrMissingPastWindow is returned when the input data is MIA and cannot be challenged. +// This is a protocol fatal error. +var ErrMissingPastWindow = errors.New("data missing past window") + +// L1Fetcher is the required interface for syncing the DA challenge contract state. +type L1Fetcher interface { + InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) + FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) + L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) +} + +// DAStorage interface for calling the DA storage server. type DAStorage interface { GetInput(ctx context.Context, key []byte) ([]byte, error) SetInput(ctx context.Context, img []byte) ([]byte, error) } +// Config is the relevant subset of rollup config for plasma DA. +type Config struct { + // Required for filtering contract events + DAChallengeContractAddress common.Address + // The number of l1 blocks after the input is committed during which one can challenge. + ChallengeWindow uint64 + // The number of l1 blocks after a commitment is challenged during which one can resolve. + ResolveWindow uint64 +} + type DA struct { log log.Logger + cfg Config + metrics Metricer + storage DAStorage -} + l1 L1Fetcher -type Input struct { - Data eth.Data + state *State + + // the latest l1 block we synced challenge contract events from + origin eth.BlockID + // the latest recorded finalized head as per the challenge contract + finalizedHead eth.L1BlockRef + + finalizedHeadSignalFunc eth.HeadSignalFn } // NewPlasmaDA creates a new PlasmaDA instance with the given log and CLIConfig. -func NewPlasmaDA(log log.Logger, cfg CLIConfig) *DA { - return &DA{ - log: log, - storage: cfg.NewDAClient(), - } +func NewPlasmaDA(log log.Logger, cli CLIConfig, cfg Config, l1f L1Fetcher, metrics Metricer) *DA { + return NewPlasmaDAWithStorage(log, cfg, cli.NewDAClient(), l1f, metrics) } // NewPlasmaDAWithStorage creates a new PlasmaDA instance with the given log and DAStorage interface. -func NewPlasmaDAWithStorage(log log.Logger, storage DAStorage) *DA { +func NewPlasmaDAWithStorage(log log.Logger, cfg Config, storage DAStorage, l1f L1Fetcher, metrics Metricer) *DA { return &DA{ log: log, + cfg: cfg, storage: storage, + l1: l1f, + metrics: metrics, + state: NewState(log, metrics), } } +// OnFinalizedHeadSignal sets the callback function to be called when the finalized head is updated. +// This will signal to the engine queue that will set the proper L2 block as finalized. +func (d *DA) OnFinalizedHeadSignal(f eth.HeadSignalFn) { + d.finalizedHeadSignalFunc = f +} + // GetInput returns the input data for the given commitment bytes. blockNumber is required to lookup // the challenge status in the DataAvailabilityChallenge L1 contract. -func (d *DA) GetInput(ctx context.Context, commitment []byte, blockNumber uint64) (Input, error) { +func (d *DA) GetInput(ctx context.Context, commitment []byte, blockId eth.BlockID) (eth.Data, error) { + // If the challenge head is ahead in the case of a pipeline reset or stall, we might have synced a + // challenge event for this commitment. Otherwise we mark the commitment as part of the cannonical + // chain so potential future challenge events can be selected. + ch := d.state.GetOrTrackChallenge(commitment, blockId.Number, d.cfg.ChallengeWindow) + + // Fetch the input from the DA storage. data, err := d.storage.GetInput(ctx, commitment) + + // data is not found in storage but may be available if the challenge was resolved. + notFound := errors.Is(ErrNotFound, err) + + if err != nil && !notFound { + d.log.Error("failed to get preimage", "err", err) + // the storage client request failed for some other reason + // in which case derivation pipeline should be retried + return nil, err + } + + switch ch.challengeStatus { + case ChallengeActive: + if d.isExpired(ch.expiresAt) { + // this challenge has expired, this input must be skipped + return nil, ErrExpiredChallenge + } else if notFound { + // data is missing and a challenge is active, we must wait for the challenge to resolve + // hence we continue syncing new origins to sync the new challenge events. + if err := d.LookAhead(ctx); err != nil { + return nil, err + } + return nil, ErrPendingChallenge + } + case ChallengeExpired: + // challenge was marked as expired, skip + return nil, ErrExpiredChallenge + case ChallengeResolved: + // challenge was resolved, data is available in storage, return directly + if !notFound { + return data, nil + } + // data not found in storage, return from challenge resolved input + resolvedInput, err := d.state.GetResolvedInput(commitment) + if err != nil { + return nil, err + } + return resolvedInput, nil + default: + if notFound { + if d.isExpired(ch.expiresAt) { + // we're past the challenge window and the data is not available + return nil, ErrMissingPastWindow + } else { + // continue syncing challenges hoping it eventually is challenged and resolved + if err := d.LookAhead(ctx); err != nil { + return nil, err + } + return nil, ErrPendingChallenge + } + } + } + + return data, nil +} + +// isExpired returns whether the given expiration block number is lower or equal to the current head +func (d *DA) isExpired(bn uint64) bool { + return d.origin.Number >= bn +} + +// AdvanceL1Origin syncs any challenge events included in the l1 block, expires any active challenges +// after the new resolveWindow, computes and signals the new finalized head and sets the l1 block +// as the new head for tracking challenges. If forwards an error if any new challenge have expired to +// trigger a derivation reset. +func (d *DA) AdvanceL1Origin(ctx context.Context, block eth.BlockID) error { + // do not repeat for the same origin + if block.Number <= d.origin.Number { + return nil + } + // sync challenges for the given block ID + if err := d.LoadChallengeEvents(ctx, block); err != nil { + return err + } + // advance challenge window, computing the finalized head + bn, err := d.state.ExpireChallenges(block.Number) if err != nil { - return Input{}, err + return err + } + + if bn > d.finalizedHead.Number { + ref, err := d.l1.L1BlockRefByNumber(ctx, bn) + if err != nil { + return err + } + d.metrics.RecordChallengesHead("finalized", bn) + + // if we get a greater finalized head, signal to the engine queue + if d.finalizedHeadSignalFunc != nil { + d.finalizedHeadSignalFunc(ctx, ref) + + } + // prune old state + d.state.Prune(bn) + d.finalizedHead = ref + + } + d.origin = block + d.metrics.RecordChallengesHead("latest", d.origin.Number) + + d.log.Info("processed plasma l1 origin", "origin", block, "next-finalized", bn, "finalized", d.finalizedHead.Number) + return nil +} + +// LoadChallengeEvents fetches the l1 block receipts and updates the challenge status +func (d *DA) LoadChallengeEvents(ctx context.Context, block eth.BlockID) error { + //cached with deposits events call so not expensive + _, receipts, err := d.l1.FetchReceipts(ctx, block.Hash) + if err != nil { + return err + } + d.log.Info("updating challenges", "epoch", block.Number, "numReceipts", len(receipts)) + for i, rec := range receipts { + if rec.Status != types.ReceiptStatusSuccessful { + continue + } + for j, log := range rec.Logs { + if log.Address == d.cfg.DAChallengeContractAddress && len(log.Topics) > 0 && log.Topics[0] == ChallengeStatusEventABIHash { + event, err := DecodeChallengeStatusEvent(log) + if err != nil { + d.log.Error("failed to decode challenge event", "block", block.Number, "tx", i, "log", j, "err", err) + continue + } + d.log.Info("decoded challenge status event", "block", block.Number, "tx", i, "log", j, "event", event) + comm, err := DecodeKeccak256(event.ChallengedCommitment) + if err != nil { + d.log.Error("failed to decode commitment", "block", block.Number, "tx", i, "err", err) + continue + } + + bn := event.ChallengedBlockNumber.Uint64() + // if we are not tracking the commitment from processing the l1 origin in derivation, + // i.e. someone challenged garbage data, this challenge is invalid. + if !d.state.IsTracking(comm.Encode(), bn) { + d.log.Warn("skipping invalid challenge", "block", bn) + continue + } + switch ChallengeStatus(event.Status) { + case ChallengeResolved: + // cached with input resolution call so not expensive + _, txs, err := d.l1.InfoAndTxsByHash(ctx, block.Hash) + if err != nil { + d.log.Error("failed to fetch l1 block", "block", block.Number, "err", err) + continue + } + tx := txs[i] + // txs and receipts must be in the same order + if tx.Hash() != rec.TxHash { + d.log.Error("tx hash mismatch", "block", block.Number, "tx", i, "log", j, "txHash", tx.Hash(), "receiptTxHash", rec.TxHash) + continue + } + input, err := DecodeResolvedInput(tx.Data()) + if err != nil { + d.log.Error("failed to decode resolved input", "block", block.Number, "tx", i, "err", err) + continue + } + if err := comm.Verify(input); err != nil { + d.log.Error("failed to verify commitment", "block", block.Number, "tx", i, "err", err) + continue + } + d.log.Debug("resolved input", "block", block.Number, "tx", i) + d.state.SetResolvedChallenge(comm.Encode(), input, log.BlockNumber) + case ChallengeActive: + d.state.SetActiveChallenge(comm.Encode(), log.BlockNumber, d.cfg.ResolveWindow) + default: + d.log.Warn("skipping unknown challenge status", "block", block.Number, "tx", i, "log", j, "status", event.Status) + } + } + } + + } + return nil +} + +// LookAhead increments the challenges head and process the new block if it exists. +// It is only used if the derivation pipeline stalls and we need to wait for a challenge to be resolved +// to get the next input. +func (d *DA) LookAhead(ctx context.Context) error { + blkRef, err := d.l1.L1BlockRefByNumber(ctx, d.origin.Number+1) + if errors.Is(err, ethereum.NotFound) { + return io.EOF + } + if err != nil { + d.log.Error("failed to fetch l1 head", "err", err) + return err + } + return d.AdvanceL1Origin(ctx, blkRef.ID()) +} + +var ( + ChallengeStatusEventName = "ChallengeStatusChanged" + ChallengeStatusEventABI = "ChallengeStatusChanged(uint256,bytes,uint8)" + ChallengeStatusEventABIHash = crypto.Keccak256Hash([]byte(ChallengeStatusEventABI)) +) + +// State getter for inspecting +func (d *DA) State() *State { + return d.state +} + +// DecodeChallengeStatusEvent decodes the challenge status event from the log data and the indexed challenged +// hash and block number from the topics. +func DecodeChallengeStatusEvent(log *types.Log) (*bindings.DataAvailabilityChallengeChallengeStatusChanged, error) { + // abi lazy loaded, cached after decoded once + dacAbi, err := bindings.DataAvailabilityChallengeMetaData.GetAbi() + if err != nil { + return nil, err + } + var event bindings.DataAvailabilityChallengeChallengeStatusChanged + err = dacAbi.UnpackIntoInterface(&event, ChallengeStatusEventName, log.Data) + if err != nil { + return nil, err + } + var indexed abi.Arguments + for _, arg := range dacAbi.Events[ChallengeStatusEventName].Inputs { + if arg.Indexed { + indexed = append(indexed, arg) + } + } + if err := abi.ParseTopics(&event, indexed, log.Topics[1:]); err != nil { + return nil, err + } + return &event, nil +} + +// DecodeResolvedInput decodes the preimage bytes from the tx input data. +func DecodeResolvedInput(data []byte) ([]byte, error) { + dacAbi, _ := bindings.DataAvailabilityChallengeMetaData.GetAbi() + + args := make(map[string]interface{}) + err := dacAbi.Methods["resolve"].Inputs.UnpackIntoMap(args, data[4:]) + if err != nil { + return nil, err + } + rd := args["resolveData"].([]byte) + if rd == nil { + return nil, fmt.Errorf("invalid resolve data") } - return Input{Data: data}, nil + return rd, nil } diff --git a/op-plasma/damgr_test.go b/op-plasma/damgr_test.go new file mode 100644 index 000000000000..ea3e8eb24ae5 --- /dev/null +++ b/op-plasma/damgr_test.go @@ -0,0 +1,209 @@ +package plasma + +import ( + "math/rand" + "testing" + + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" +) + +func RandomData(rng *rand.Rand, size int) []byte { + out := make([]byte, size) + rng.Read(out) + return out +} + +// TestDAChallengeState is a simple test with small values to verify the finalized head logic +func TestDAChallengeState(t *testing.T) { + logger := testlog.Logger(t, log.LvlDebug) + + rng := rand.New(rand.NewSource(1234)) + state := NewState(logger, &NoopMetrics{}) + + i := uint64(1) + + challengeWindow := uint64(6) + resolveWindow := uint64(6) + + // track commitments in the first 10 blocks + for ; i < 10; i++ { + // this is akin to stepping the derivation pipeline through a range a blocks each with a commitment + state.SetInputCommitment(RandomData(rng, 32), i, challengeWindow) + } + + // blocks are finalized after the challenge window expires + bn, err := state.ExpireChallenges(10) + require.NoError(t, err) + // finalized head = 10 - 6 = 4 + require.Equal(t, uint64(4), bn) + + // track the next commitment and mark it as challenged + c := RandomData(rng, 32) + // add input commitment at block i = 10 + state.SetInputCommitment(c, 10, challengeWindow) + // i+4 is the block at which it was challenged + state.SetActiveChallenge(c, 14, resolveWindow) + + for j := i + 1; j < 18; j++ { + // continue walking the pipeline through some more blocks with commitments + state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow) + } + + // finalized l1 origin should not extend past the resolve window + bn, err = state.ExpireChallenges(18) + require.NoError(t, err) + // finalized is active_challenge_block - 1 = 10 - 1 and cannot move until the challenge expires + require.Equal(t, uint64(9), bn) + + // walk past the resolve window + for j := uint64(18); j < 22; j++ { + state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow) + } + + // no more active challenges, the finalized head can catch up to the challenge window + bn, err = state.ExpireChallenges(22) + require.ErrorIs(t, err, ErrReorgRequired) + // finalized head is now 22 - 6 = 16 + require.Equal(t, uint64(16), bn) + + // cleanup state we don't need anymore + state.Prune(22) + // now if we expire the challenges again, it won't request a reorg again + bn, err = state.ExpireChallenges(22) + require.NoError(t, err) + // finalized head hasn't moved + require.Equal(t, uint64(16), bn) + + i = 22 + // add one more commitment and challenge it + c = RandomData(rng, 32) + state.SetInputCommitment(c, 22, challengeWindow) + // challenge 3 blocks after + state.SetActiveChallenge(c, 25, resolveWindow) + + // exceed the challenge window with more commitments + for j := uint64(23); j < 30; j++ { + state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow) + } + + // finalized head should not extend past the resolve window + bn, err = state.ExpireChallenges(30) + require.NoError(t, err) + // finalized head is stuck waiting for resolve window + require.Equal(t, uint64(21), bn) + + input := RandomData(rng, 100) + // resolve the challenge + state.SetResolvedChallenge(c, input, 30) + + // finalized head catches up + bn, err = state.ExpireChallenges(31) + require.NoError(t, err) + // finalized head is now 31 - 6 = 25 + require.Equal(t, uint64(25), bn) + + // the resolved input is also stored + storedInput, err := state.GetResolvedInput(c) + require.NoError(t, err) + require.Equal(t, input, storedInput) +} + +// TestExpireChallenges expires challenges and prunes the state for longer windows +// with commitments every 6 blocks. +func TestExpireChallenges(t *testing.T) { + logger := testlog.Logger(t, log.LvlDebug) + + rng := rand.New(rand.NewSource(1234)) + state := NewState(logger, &NoopMetrics{}) + + comms := make(map[uint64][]byte) + + i := uint64(3713854) + + var finalized uint64 + + challengeWindow := uint64(90) + resolveWindow := uint64(90) + + // increment new commitments every 6 blocks + for ; i < 3713948; i += 6 { + comm := RandomData(rng, 32) + comms[i] = comm + logger.Info("set commitment", "block", i) + cm := state.GetOrTrackChallenge(comm, i, challengeWindow) + require.NotNil(t, cm) + + bn, err := state.ExpireChallenges(i) + logger.Info("expire challenges", "finalized head", bn, "err", err) + + // only update finalized head if it has moved + if bn > finalized { + finalized = bn + // prune unused state + state.Prune(bn) + } + } + + // activate a couple of subsquent challenges + state.SetActiveChallenge(comms[3713926], 3713948, resolveWindow) + + state.SetActiveChallenge(comms[3713932], 3713950, resolveWindow) + + // continue incrementing commitments + for ; i < 3714038; i += 6 { + comm := RandomData(rng, 32) + comms[i] = comm + logger.Info("set commitment", "block", i) + cm := state.GetOrTrackChallenge(comm, i, challengeWindow) + require.NotNil(t, cm) + + bn, err := state.ExpireChallenges(i) + logger.Info("expire challenges", "expired", bn, "err", err) + + if bn > finalized { + finalized = bn + state.Prune(bn) + } + + } + + // finalized head does not move as it expires previously seen blocks + bn, err := state.ExpireChallenges(3714034) + require.NoError(t, err) + require.Equal(t, uint64(3713920), bn) + + bn, err = state.ExpireChallenges(3714035) + require.NoError(t, err) + require.Equal(t, uint64(3713920), bn) + + bn, err = state.ExpireChallenges(3714036) + require.NoError(t, err) + require.Equal(t, uint64(3713920), bn) + + bn, err = state.ExpireChallenges(3714037) + require.NoError(t, err) + require.Equal(t, uint64(3713920), bn) + + // lastly we get to the resolve window and trigger a reorg + bn, err = state.ExpireChallenges(3714038) + require.ErrorIs(t, err, ErrReorgRequired) + + // this is simulating a pipeline reset where it walks back challenge + resolve window + for i := uint64(3713854); i < 3714044; i += 6 { + cm := state.GetOrTrackChallenge(comms[i], i, challengeWindow) + require.NotNil(t, cm) + + // check that the challenge status was updated to expired + if i == 3713926 { + require.Equal(t, ChallengeExpired, cm.challengeStatus) + } + } + + bn, err = state.ExpireChallenges(3714038) + require.NoError(t, err) + + // finalized at last + require.Equal(t, uint64(3713926), bn) +} diff --git a/op-plasma/damock.go b/op-plasma/damock.go index 3b0f40fbff66..342742b7bb05 100644 --- a/op-plasma/damock.go +++ b/op-plasma/damock.go @@ -4,7 +4,6 @@ import ( "context" "errors" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/log" @@ -25,6 +24,11 @@ func NewMockDAClient(log log.Logger) *MockDAClient { } func (c *MockDAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) { + // Validate the commitment to make sure we only pass encoded types. + _, err := DecodeKeccak256(key) + if err != nil { + return nil, err + } bytes, err := c.store.Get(key) if err != nil { return nil, ErrNotFound @@ -33,7 +37,7 @@ func (c *MockDAClient) GetInput(ctx context.Context, key []byte) ([]byte, error) } func (c *MockDAClient) SetInput(ctx context.Context, data []byte) ([]byte, error) { - key := crypto.Keccak256(data) + key := Keccak256(data).Encode() return key, c.store.Put(key, data) } @@ -56,7 +60,7 @@ func (f *DAErrFaker) GetInput(ctx context.Context, key []byte) ([]byte, error) { return f.Client.GetInput(ctx, key) } -func (f *DAErrFaker) SetPreImage(ctx context.Context, data []byte) ([]byte, error) { +func (f *DAErrFaker) SetInput(ctx context.Context, data []byte) ([]byte, error) { if err := f.setInputErr; err != nil { f.setInputErr = nil return nil, err diff --git a/op-plasma/dastate.go b/op-plasma/dastate.go new file mode 100644 index 000000000000..ec4fc5832a91 --- /dev/null +++ b/op-plasma/dastate.go @@ -0,0 +1,182 @@ +package plasma + +import ( + "container/heap" + "errors" + + "github.com/ethereum/go-ethereum/log" +) + +// ErrReorgRequired is returned when a commitment was derived but for which the challenge expired. +// This requires a reorg to rederive without the input even if the input was previously available. +var ErrReorgRequired = errors.New("reorg required") + +type ChallengeStatus uint8 + +const ( + ChallengeUnititialized ChallengeStatus = iota + ChallengeActive + ChallengeResolved + ChallengeExpired +) + +// Commitment keeps track of the onchain state of an input commitment. +type Commitment struct { + hash []byte // the keccak256 hash of the input + input []byte // the input itself if it was resolved onchain + expiresAt uint64 // represents the block number after which the commitment can no longer be challenged or if challenged no longer be resolved. + blockNumber uint64 // block where the commitment is included as calldata to the batcher inbox + challengeStatus ChallengeStatus // latest known challenge status +} + +// CommQueue is a queue of commitments ordered by block number. +type CommQueue []*Commitment + +var _ heap.Interface = (*CommQueue)(nil) + +func (c CommQueue) Len() int { return len(c) } + +func (c CommQueue) Less(i, j int) bool { + return c[i].blockNumber < c[j].blockNumber +} + +func (c CommQueue) Swap(i, j int) { + c[i], c[j] = c[j], c[i] +} + +func (c *CommQueue) Push(x any) { + *c = append(*c, x.(*Commitment)) +} + +func (c *CommQueue) Pop() any { + old := *c + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + *c = old[0 : n-1] + return item +} + +// State tracks the commitment and their challenges in order of l1 inclusion. +type State struct { + comms CommQueue + commsByHash map[string]*Commitment + log log.Logger + metrics Metricer +} + +func NewState(log log.Logger, m Metricer) *State { + return &State{ + comms: make(CommQueue, 0), + commsByHash: make(map[string]*Commitment), + log: log, + metrics: m, + } +} + +// IsTracking returns whether we currently have a commitment for the given hash. +func (s *State) IsTracking(comm []byte, bn uint64) bool { + if c, ok := s.commsByHash[string(comm)]; ok { + return c.blockNumber == bn + } + return false +} + +// SetActiveChallenge switches the state of a given commitment to active challenge. Noop if +// the commitment is not tracked as we don't want to track challenges for invalid commitments. +func (s *State) SetActiveChallenge(comm []byte, challengedAt uint64, resolveWindow uint64) { + if c, ok := s.commsByHash[string(comm)]; ok { + c.expiresAt = challengedAt + resolveWindow + c.challengeStatus = ChallengeActive + s.metrics.RecordActiveChallenge(c.blockNumber, challengedAt, comm) + } +} + +// SetResolvedChallenge switches the state of a given commitment to resolved. Noop if +// the commitment is not tracked as we don't want to track challenges for invalid commitments. +// The input posted onchain is stored in the state for later retrieval. +func (s *State) SetResolvedChallenge(comm []byte, input []byte, resolvedAt uint64) { + if c, ok := s.commsByHash[string(comm)]; ok { + c.challengeStatus = ChallengeResolved + c.expiresAt = resolvedAt + c.input = input + s.metrics.RecordResolvedChallenge(comm) + } +} + +// SetInputCommitment initializes a new commitment and adds it to the state. +func (s *State) SetInputCommitment(comm []byte, committedAt uint64, challengeWindow uint64) *Commitment { + c := &Commitment{ + hash: comm, + expiresAt: committedAt + challengeWindow, + blockNumber: committedAt, + } + s.log.Debug("append commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber) + heap.Push(&s.comms, c) + s.commsByHash[string(comm)] = c + + return c +} + +// GetOrTrackChallenge returns the commitment for the given hash if it is already tracked, or +// initializes a new commitment and adds it to the state. +func (s *State) GetOrTrackChallenge(comm []byte, bn uint64, challengeWindow uint64) *Commitment { + if c, ok := s.commsByHash[string(comm)]; ok { + return c + } + return s.SetInputCommitment(comm, bn, challengeWindow) +} + +// GetResolvedInput returns the input bytes if the commitment was resolved onchain. +func (s *State) GetResolvedInput(comm []byte) ([]byte, error) { + if c, ok := s.commsByHash[string(comm)]; ok { + return c.input, nil + } + return nil, errors.New("commitment not found") +} + +// ExpireChallenges walks back from the oldest commitment to find the latest l1 origin +// for which input data can no longer be challenged. It also marks any active challenges +// as expired based on the new latest l1 origin. If any active challenges are expired +// it returns an error to signal that a derivation pipeline reset is required. +func (s *State) ExpireChallenges(bn uint64) (uint64, error) { + latest := uint64(0) + var err error + for i := 0; i < len(s.comms); i++ { + c := s.comms[i] + if c.expiresAt <= bn && c.blockNumber > latest { + latest = c.blockNumber + + if c.challengeStatus == ChallengeActive { + c.challengeStatus = ChallengeExpired + s.metrics.RecordExpiredChallenge(c.hash) + err = ErrReorgRequired + } + } else { + break + } + } + return latest, err +} + +// safely prune in case reset is deeper than the finalized l1 +const commPruneMargin = 200 + +// Prune removes commitments once they can no longer be challenged or resolved. +func (s *State) Prune(bn uint64) { + if bn > commPruneMargin { + bn -= commPruneMargin + } else { + bn = 0 + } + for i := 0; i < len(s.comms); i++ { + c := s.comms[i] + if c.blockNumber < bn { + s.log.Debug("prune commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber) + delete(s.commsByHash, string(c.hash)) + } else { + s.comms = s.comms[i:] + break + } + } +} diff --git a/op-plasma/metrics.go b/op-plasma/metrics.go new file mode 100644 index 000000000000..9843e91f3863 --- /dev/null +++ b/op-plasma/metrics.go @@ -0,0 +1,73 @@ +package plasma + +import ( + "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +type Metricer interface { + RecordActiveChallenge(commBlock uint64, startBlock uint64, hash []byte) + RecordResolvedChallenge(hash []byte) + RecordExpiredChallenge(hash []byte) + RecordChallengesHead(name string, num uint64) + RecordStorageError() +} + +type Metrics struct { + ChallengesStatus *prometheus.GaugeVec + ChallengesHead *prometheus.GaugeVec + + StorageErrors *metrics.Event +} + +var _ Metricer = (*Metrics)(nil) + +func MakeMetrics(ns string, factory metrics.Factory) *Metrics { + return &Metrics{ + ChallengesStatus: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "challenges_status", + Help: "Gauge representing the status of challenges synced", + }, []string{"status"}), + ChallengesHead: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "challenges_head", + Help: "Gauge representing the l1 heads of challenges synced", + }, []string{"type"}), + StorageErrors: metrics.NewEvent(factory, ns, "", "storage_errors", "errors when fetching or uploading to storage service"), + } +} + +func (m *Metrics) RecordChallenge(status string) { + m.ChallengesStatus.WithLabelValues(status).Inc() +} + +// RecordActiveChallenge records when a commitment is challenged including the block where the commitment +// is included, the block where the commitment was challenged and the commitment hash. +func (m *Metrics) RecordActiveChallenge(commBlock uint64, startBlock uint64, hash []byte) { + m.RecordChallenge("active") +} + +func (m *Metrics) RecordResolvedChallenge(hash []byte) { + m.RecordChallenge("resolved") +} + +func (m *Metrics) RecordExpiredChallenge(hash []byte) { + m.RecordChallenge("expired") +} + +func (m *Metrics) RecordStorageError() { + m.StorageErrors.Record() +} + +func (m *Metrics) RecordChallengesHead(name string, num uint64) { + m.ChallengesHead.WithLabelValues(name).Set(float64(num)) +} + +type NoopMetrics struct{} + +func (m *NoopMetrics) RecordActiveChallenge(commBlock uint64, startBlock uint64, hash []byte) {} +func (m *NoopMetrics) RecordResolvedChallenge(hash []byte) {} +func (m *NoopMetrics) RecordExpiredChallenge(hash []byte) {} +func (m *NoopMetrics) RecordChallengesHead(name string, num uint64) {} +func (m *NoopMetrics) RecordStorageError() {} diff --git a/packages/contracts-bedrock/deploy-config/devnetL1-template.json b/packages/contracts-bedrock/deploy-config/devnetL1-template.json index 0ea82eb7bc55..52f09058e1f2 100644 --- a/packages/contracts-bedrock/deploy-config/devnetL1-template.json +++ b/packages/contracts-bedrock/deploy-config/devnetL1-template.json @@ -62,9 +62,9 @@ "disputeGameFinalityDelaySeconds": 6, "respectedGameType": 0, "useFaultProofs": false, - "usePlasma": false, - "daChallengeWindow": 1000, - "daResolveWindow": 1000, + "usePlasma": true, + "daChallengeWindow": 6, + "daResolveWindow": 6, "daBondSize": 1000000, "daResolverRefundPercentage": 0 }