diff --git a/op-e2e/actions/plasma_test.go b/op-e2e/actions/plasma_test.go index e4bf18229eb9..0c1426b48a58 100644 --- a/op-e2e/actions/plasma_test.go +++ b/op-e2e/actions/plasma_test.go @@ -76,7 +76,7 @@ func NewL2PlasmaDA(t Testing, params ...PlasmaParam) *L2PlasmaDA { plasmaCfg, err := sd.RollupCfg.PlasmaConfig() require.NoError(t, err) - daMgr := plasma.NewPlasmaDAWithStorage(log, plasmaCfg, storage, l1F, &plasma.NoopMetrics{}) + daMgr := plasma.NewPlasmaDAWithStorage(log, plasmaCfg, storage, &plasma.NoopMetrics{}) sequencer := NewL2Sequencer(t, log, l1F, nil, daMgr, engCl, sd.RollupCfg, 0) miner.ActL1SetFeeRecipient(common.Address{'A'}) @@ -134,7 +134,7 @@ func (a *L2PlasmaDA) NewVerifier(t Testing) *L2Verifier { l1F, err := sources.NewL1Client(a.miner.RPCClient(), a.log, nil, sources.L1ClientDefaultConfig(a.sd.RollupCfg, false, sources.RPCKindBasic)) require.NoError(t, err) - daMgr := plasma.NewPlasmaDAWithStorage(a.log, a.plasmaCfg, a.storage, l1F, &plasma.NoopMetrics{}) + daMgr := plasma.NewPlasmaDAWithStorage(a.log, a.plasmaCfg, a.storage, &plasma.NoopMetrics{}) verifier := NewL2Verifier(t, a.log, l1F, nil, daMgr, engCl, a.sd.RollupCfg, &sync.Config{}, safedb.Disabled) diff --git a/op-node/node/node.go b/op-node/node/node.go index 1960676f1421..324a595f4ab2 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -391,7 +391,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger if cfg.Plasma.Enabled && err != nil { return fmt.Errorf("failed to get plasma config: %w", err) } - plasmaDA := plasma.NewPlasmaDA(n.log, cfg.Plasma, rpCfg, n.l1Source, n.metrics.PlasmaMetrics) + plasmaDA := plasma.NewPlasmaDA(n.log, cfg.Plasma, rpCfg, n.metrics.PlasmaMetrics) if cfg.SafeDBPath != "" { n.log.Info("Safe head database enabled", "path", cfg.SafeDBPath) safeDB, err := safedb.NewSafeDB(n.log, cfg.SafeDBPath) diff --git a/op-node/rollup/derive/data_source.go b/op-node/rollup/derive/data_source.go index 3cef221cf4be..45b4e3a63c9a 100644 --- a/op-node/rollup/derive/data_source.go +++ b/op-node/rollup/derive/data_source.go @@ -19,6 +19,8 @@ type DataIter interface { type L1TransactionFetcher interface { InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) + FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) + L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) } type L1BlobsFetcher interface { @@ -28,9 +30,9 @@ type L1BlobsFetcher interface { type PlasmaInputFetcher interface { // GetInput fetches the input for the given commitment at the given block number from the DA storage service. - GetInput(ctx context.Context, c plasma.Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) + GetInput(ctx context.Context, l1 plasma.L1Fetcher, c plasma.Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) // AdvanceL1Origin advances the L1 origin to the given block number, syncing the DA challenge events. - AdvanceL1Origin(ctx context.Context, blockId eth.BlockID) error + AdvanceL1Origin(ctx context.Context, l1 plasma.L1Fetcher, blockId eth.BlockID) error // Reset the challenge origin in case of L1 reorg Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error // Notify L1 finalized head so plasma finality is always behind L1 @@ -82,7 +84,7 @@ func (ds *DataSourceFactory) OpenData(ctx context.Context, ref eth.L1BlockRef, b } if ds.dsCfg.plasmaEnabled { // plasma([calldata | blobdata](l1Ref)) -> data - return NewPlasmaDataSource(ds.log, src, ds.plasmaFetcher, ref.ID()), nil + return NewPlasmaDataSource(ds.log, src, ds.fetcher, ds.plasmaFetcher, ref.ID()), nil } return src, nil } diff --git a/op-node/rollup/derive/plasma_data_source.go b/op-node/rollup/derive/plasma_data_source.go index 3523af2572d4..d5eb552beb49 100644 --- a/op-node/rollup/derive/plasma_data_source.go +++ b/op-node/rollup/derive/plasma_data_source.go @@ -16,16 +16,18 @@ type PlasmaDataSource struct { log log.Logger src DataIter fetcher PlasmaInputFetcher + l1 L1TransactionFetcher id eth.BlockID // keep track of a pending commitment so we can keep trying to fetch the input. comm plasma.Keccak256Commitment } -func NewPlasmaDataSource(log log.Logger, src DataIter, fetcher PlasmaInputFetcher, id eth.BlockID) *PlasmaDataSource { +func NewPlasmaDataSource(log log.Logger, src DataIter, l1 L1TransactionFetcher, fetcher PlasmaInputFetcher, id eth.BlockID) *PlasmaDataSource { return &PlasmaDataSource{ log: log, src: src, fetcher: fetcher, + l1: l1, id: id, } } @@ -35,7 +37,7 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) { // before we can proceed to fetch the input data. This function can be called multiple times // for the same origin and noop if the origin was already processed. It is also called if // there is not commitment in the current origin. - if err := s.fetcher.AdvanceL1Origin(ctx, s.id); err != nil { + if err := s.fetcher.AdvanceL1Origin(ctx, s.l1, s.id); err != nil { if errors.Is(err, plasma.ErrReorgRequired) { return nil, NewResetError(fmt.Errorf("new expired challenge")) } @@ -58,7 +60,7 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) { s.comm = comm } // use the commitment to fetch the input from the plasma DA provider. - data, err := s.fetcher.GetInput(ctx, s.comm, s.id) + data, err := s.fetcher.GetInput(ctx, s.l1, s.comm, s.id) // GetInput may call for a reorg if the pipeline is stalled and the plasma DA manager // continued syncing origins detached from the pipeline origin. if errors.Is(err, plasma.ErrReorgRequired) { @@ -66,7 +68,7 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) { return nil, NewResetError(err) } else if errors.Is(err, plasma.ErrExpiredChallenge) { // this commitment was challenged and the challenge expired. - s.log.Warn("challenge expired, skipping batch", "comm", fmt.Sprintf("%x", s.comm)) + s.log.Warn("challenge expired, skipping batch", "comm", s.comm) s.comm = nil // skip the input return s.Next(ctx) diff --git a/op-node/rollup/derive/plasma_data_source_test.go b/op-node/rollup/derive/plasma_data_source_test.go index d62f85806ae9..ba2e7c6f06ce 100644 --- a/op-node/rollup/derive/plasma_data_source_test.go +++ b/op-node/rollup/derive/plasma_data_source_test.go @@ -58,7 +58,7 @@ func TestPlasmaDataSource(t *testing.T) { daState := plasma.NewState(logger, metrics) - da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, l1F, metrics, daState) + da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, metrics, daState) finalitySignal := &MockFinalitySignal{} da.OnFinalizedHeadSignal(finalitySignal.OnFinalized) @@ -293,7 +293,7 @@ func TestPlasmaDataSourceStall(t *testing.T) { daState := plasma.NewState(logger, metrics) - da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, l1F, metrics, daState) + da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, metrics, daState) finalitySignal := &MockFinalitySignal{} da.OnFinalizedHeadSignal(finalitySignal.OnFinalized) @@ -411,7 +411,7 @@ func TestPlasmaDataSourceInvalidData(t *testing.T) { ChallengeWindow: 90, ResolveWindow: 90, } - da := plasma.NewPlasmaDAWithStorage(logger, pcfg, storage, l1F, &plasma.NoopMetrics{}) + da := plasma.NewPlasmaDAWithStorage(logger, pcfg, storage, &plasma.NoopMetrics{}) // Create rollup genesis and config l1Time := uint64(2) diff --git a/op-plasma/damgr.go b/op-plasma/damgr.go index 37e4b6ff90b8..13d67aa505ba 100644 --- a/op-plasma/damgr.go +++ b/op-plasma/damgr.go @@ -63,8 +63,8 @@ type DA struct { metrics Metricer storage DAStorage - l1 L1Fetcher + // the DA state keeps track of all the commitments and their challenge status. state *State // the latest l1 block we synced challenge contract events from @@ -74,35 +74,34 @@ type DA struct { // the latest recorded finalized head as per the l1 finalization signal l1FinalizedHead eth.L1BlockRef // flag the reset function we are resetting because of an expired challenge - resetting int + resetting bool finalizedHeadSignalFunc HeadSignalFn } // NewPlasmaDA creates a new PlasmaDA instance with the given log and CLIConfig. -func NewPlasmaDA(log log.Logger, cli CLIConfig, cfg Config, l1f L1Fetcher, metrics Metricer) *DA { - return NewPlasmaDAWithStorage(log, cfg, cli.NewDAClient(), l1f, metrics) +func NewPlasmaDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) *DA { + return NewPlasmaDAWithStorage(log, cfg, cli.NewDAClient(), metrics) } // NewPlasmaDAWithStorage creates a new PlasmaDA instance with the given log and DAStorage interface. -func NewPlasmaDAWithStorage(log log.Logger, cfg Config, storage DAStorage, l1f L1Fetcher, metrics Metricer) *DA { +func NewPlasmaDAWithStorage(log log.Logger, cfg Config, storage DAStorage, metrics Metricer) *DA { return &DA{ log: log, cfg: cfg, storage: storage, - l1: l1f, metrics: metrics, state: NewState(log, metrics), } } // NewPlasmaWithState creates a plasma storage from initial state used for testing in isolation. -func NewPlasmaDAWithState(log log.Logger, cfg Config, storage DAStorage, l1f L1Fetcher, metrics Metricer, state *State) *DA { +// We pass the L1Fetcher to each method so it is kept in sync with the conf depth of the pipeline. +func NewPlasmaDAWithState(log log.Logger, cfg Config, storage DAStorage, metrics Metricer, state *State) *DA { return &DA{ log: log, cfg: cfg, storage: storage, - l1: l1f, metrics: metrics, state: state, } @@ -138,20 +137,24 @@ func (d *DA) Finalize(l1Finalized eth.L1BlockRef) { // LookAhead increments the challenges origin and process the new block if it exists. // It is used when the derivation pipeline stalls due to missing data and we need to continue // syncing challenge events until the challenge is resolved or expires. -func (d *DA) LookAhead(ctx context.Context) error { - blkRef, err := d.l1.L1BlockRefByNumber(ctx, d.origin.Number+1) +func (d *DA) LookAhead(ctx context.Context, l1 L1Fetcher) error { + blkRef, err := l1.L1BlockRefByNumber(ctx, d.origin.Number+1) // temporary error, will do a backoff if err != nil { return err } - return d.AdvanceL1Origin(ctx, blkRef.ID()) + return d.AdvanceL1Origin(ctx, l1, blkRef.ID()) } // Reset the challenge event derivation origin in case of L1 reorg func (d *DA) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error { - // resetting due to expired challenge, do not clear state - if d.resetting > 0 { - d.resetting-- + // resetting due to expired challenge, do not clear state. + // If the DA source returns ErrReset, the pipeline is forced to reset by the rollup driver. + // In that case the Reset function will be called immediately, BEFORE the pipeline can + // call any further stage to step. Thus the state will NOT be cleared if the reset originates + // from this stage of the pipeline. + if d.resetting { + d.resetting = false } else { // resetting due to L1 reorg, clear state d.origin = base.ID() @@ -162,7 +165,7 @@ func (d *DA) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemC // GetInput returns the input data for the given commitment bytes. blockNumber is required to lookup // the challenge status in the DataAvailabilityChallenge L1 contract. -func (d *DA) GetInput(ctx context.Context, comm Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) { +func (d *DA) GetInput(ctx context.Context, l1 L1Fetcher, comm Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) { // If the challenge head is ahead in the case of a pipeline reset or stall, we might have synced a // challenge event for this commitment. Otherwise we mark the commitment as part of the canonical // chain so potential future challenge events can be selected. @@ -189,7 +192,7 @@ func (d *DA) GetInput(ctx context.Context, comm Keccak256Commitment, blockId eth } else if notFound { // data is missing and a challenge is active, we must wait for the challenge to resolve // hence we continue syncing new origins to sync the new challenge events. - if err := d.LookAhead(ctx); err != nil { + if err := d.LookAhead(ctx, l1); err != nil { return nil, err } return nil, ErrPendingChallenge @@ -215,7 +218,7 @@ func (d *DA) GetInput(ctx context.Context, comm Keccak256Commitment, blockId eth return nil, ErrMissingPastWindow } else { // continue syncing challenges hoping it eventually is challenged and resolved - if err := d.LookAhead(ctx); err != nil { + if err := d.LookAhead(ctx, l1); err != nil { return nil, err } return nil, ErrPendingChallenge @@ -235,27 +238,27 @@ func (d *DA) isExpired(bn uint64) bool { // after the new resolveWindow, computes and signals the new finalized head and sets the l1 block // as the new head for tracking challenges. If forwards an error if any new challenge have expired to // trigger a derivation reset. -func (d *DA) AdvanceL1Origin(ctx context.Context, block eth.BlockID) error { +func (d *DA) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error { // do not repeat for the same origin if block.Number <= d.origin.Number { return nil } // sync challenges for the given block ID - if err := d.LoadChallengeEvents(ctx, block); err != nil { + if err := d.LoadChallengeEvents(ctx, l1, block); err != nil { return err } // advance challenge window, computing the finalized head bn, err := d.state.ExpireChallenges(block.Number) if err != nil { // warn the reset function not to clear the state - d.resetting++ + d.resetting = true return err } // finalized head signal is called only when the finalized head number increases // and the l1 finalized head ahead of the DA finalized head. if bn > d.finalizedHead.Number { - ref, err := d.l1.L1BlockRefByNumber(ctx, bn) + ref, err := l1.L1BlockRefByNumber(ctx, bn) if err != nil { return err } @@ -273,9 +276,9 @@ func (d *DA) AdvanceL1Origin(ctx context.Context, block eth.BlockID) error { } // LoadChallengeEvents fetches the l1 block receipts and updates the challenge status -func (d *DA) LoadChallengeEvents(ctx context.Context, block eth.BlockID) error { +func (d *DA) LoadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error { // filter any challenge event logs in the block - logs, err := d.fetchChallengeLogs(ctx, block) + logs, err := d.fetchChallengeLogs(ctx, l1, block) if err != nil { return err } @@ -290,7 +293,7 @@ func (d *DA) LoadChallengeEvents(ctx context.Context, block eth.BlockID) error { switch status { case ChallengeResolved: // cached with input resolution call so not expensive - _, txs, err := d.l1.InfoAndTxsByHash(ctx, block.Hash) + _, txs, err := l1.InfoAndTxsByHash(ctx, block.Hash) if err != nil { d.log.Error("failed to fetch l1 block", "block", block.Number, "err", err) continue @@ -330,9 +333,9 @@ func (d *DA) LoadChallengeEvents(ctx context.Context, block eth.BlockID) error { } // fetchChallengeLogs returns logs for challenge events if any for the given block -func (d *DA) fetchChallengeLogs(ctx context.Context, block eth.BlockID) ([]*types.Log, error) { //cached with deposits events call so not expensive +func (d *DA) fetchChallengeLogs(ctx context.Context, l1 L1Fetcher, block eth.BlockID) ([]*types.Log, error) { //cached with deposits events call so not expensive var logs []*types.Log - _, receipts, err := d.l1.FetchReceipts(ctx, block.Hash) + _, receipts, err := l1.FetchReceipts(ctx, block.Hash) if err != nil { return nil, err } diff --git a/op-plasma/damgr_test.go b/op-plasma/damgr_test.go index c69fb3e1215d..b80b2a33046f 100644 --- a/op-plasma/damgr_test.go +++ b/op-plasma/damgr_test.go @@ -261,7 +261,7 @@ func TestFilterInvalidBlockNumber(t *testing.T) { bn := uint64(19) bhash := common.HexToHash("0xd438144ffab918b1349e7cd06889c26800c26d8edc34d64f750e3e097166a09c") - da := NewPlasmaDAWithStorage(logger, pcfg, storage, l1F, &NoopMetrics{}) + da := NewPlasmaDAWithStorage(logger, pcfg, storage, &NoopMetrics{}) receipts := types.Receipts{&types.Receipt{ Type: 2, @@ -293,7 +293,7 @@ func TestFilterInvalidBlockNumber(t *testing.T) { l1F.ExpectFetchReceipts(bhash, nil, receipts, nil) // we get 1 logs successfully filtered as valid status updated contract event - logs, err := da.fetchChallengeLogs(ctx, id) + logs, err := da.fetchChallengeLogs(ctx, l1F, id) require.NoError(t, err) require.Equal(t, len(logs), 1) diff --git a/op-plasma/damock.go b/op-plasma/damock.go index 3beb3654cc99..f21e80df25e0 100644 --- a/op-plasma/damock.go +++ b/op-plasma/damock.go @@ -80,7 +80,7 @@ var ErrNotEnabled = errors.New("plasma not enabled") // PlasmaDisabled is a noop plasma DA implementation for stubbing. type PlasmaDisabled struct{} -func (d *PlasmaDisabled) GetInput(ctx context.Context, commitment Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) { +func (d *PlasmaDisabled) GetInput(ctx context.Context, l1 L1Fetcher, commitment Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) { return nil, ErrNotEnabled } @@ -94,6 +94,6 @@ func (d *PlasmaDisabled) Finalize(ref eth.L1BlockRef) { func (d *PlasmaDisabled) OnFinalizedHeadSignal(f HeadSignalFn) { } -func (d *PlasmaDisabled) AdvanceL1Origin(ctx context.Context, blockId eth.BlockID) error { +func (d *PlasmaDisabled) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, blockId eth.BlockID) error { return ErrNotEnabled } diff --git a/op-plasma/dastate.go b/op-plasma/dastate.go index 4fea48e6424f..b36471d4db61 100644 --- a/op-plasma/dastate.go +++ b/op-plasma/dastate.go @@ -171,16 +171,15 @@ func (s *State) Prune(bn uint64) { } else { bn = 0 } - for i := 0; i < len(s.comms); i++ { - c := s.comms[i] - if c.blockNumber < bn { - s.log.Debug("prune commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber) - delete(s.commsByKey, string(c.key)) - } else { - // once we're past the given index, remove all commitments - s.comms = s.comms[i:] - break - } + if s.comms.Len() == 0 { + return + } + // only first element is the highest priority (lowest block number). + // next highest priority is swapped to the first position after a Pop. + for s.comms[0].blockNumber < bn { + c := heap.Pop(&s.comms).(*Commitment) + s.log.Debug("prune commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber) + delete(s.commsByKey, string(c.key)) } }