Skip to content

Commit

Permalink
fix: adjust plasma state pruning and use bool for DA resetting flag
Browse files Browse the repository at this point in the history
  • Loading branch information
tchardin committed Mar 12, 2024
1 parent 767af5a commit 73ce442
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 53 deletions.
4 changes: 2 additions & 2 deletions op-e2e/actions/plasma_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewL2PlasmaDA(t Testing, params ...PlasmaParam) *L2PlasmaDA {
plasmaCfg, err := sd.RollupCfg.PlasmaConfig()
require.NoError(t, err)

daMgr := plasma.NewPlasmaDAWithStorage(log, plasmaCfg, storage, l1F, &plasma.NoopMetrics{})
daMgr := plasma.NewPlasmaDAWithStorage(log, plasmaCfg, storage, &plasma.NoopMetrics{})

sequencer := NewL2Sequencer(t, log, l1F, nil, daMgr, engCl, sd.RollupCfg, 0)
miner.ActL1SetFeeRecipient(common.Address{'A'})
Expand Down Expand Up @@ -134,7 +134,7 @@ func (a *L2PlasmaDA) NewVerifier(t Testing) *L2Verifier {
l1F, err := sources.NewL1Client(a.miner.RPCClient(), a.log, nil, sources.L1ClientDefaultConfig(a.sd.RollupCfg, false, sources.RPCKindBasic))
require.NoError(t, err)

daMgr := plasma.NewPlasmaDAWithStorage(a.log, a.plasmaCfg, a.storage, l1F, &plasma.NoopMetrics{})
daMgr := plasma.NewPlasmaDAWithStorage(a.log, a.plasmaCfg, a.storage, &plasma.NoopMetrics{})

verifier := NewL2Verifier(t, a.log, l1F, nil, daMgr, engCl, a.sd.RollupCfg, &sync.Config{}, safedb.Disabled)

Expand Down
2 changes: 1 addition & 1 deletion op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
if cfg.Plasma.Enabled && err != nil {
return fmt.Errorf("failed to get plasma config: %w", err)
}
plasmaDA := plasma.NewPlasmaDA(n.log, cfg.Plasma, rpCfg, n.l1Source, n.metrics.PlasmaMetrics)
plasmaDA := plasma.NewPlasmaDA(n.log, cfg.Plasma, rpCfg, n.metrics.PlasmaMetrics)
if cfg.SafeDBPath != "" {
n.log.Info("Safe head database enabled", "path", cfg.SafeDBPath)
safeDB, err := safedb.NewSafeDB(n.log, cfg.SafeDBPath)
Expand Down
8 changes: 5 additions & 3 deletions op-node/rollup/derive/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type DataIter interface {

type L1TransactionFetcher interface {
InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
}

type L1BlobsFetcher interface {
Expand All @@ -28,9 +30,9 @@ type L1BlobsFetcher interface {

type PlasmaInputFetcher interface {
// GetInput fetches the input for the given commitment at the given block number from the DA storage service.
GetInput(ctx context.Context, c plasma.Keccak256Commitment, blockId eth.BlockID) (eth.Data, error)
GetInput(ctx context.Context, l1 plasma.L1Fetcher, c plasma.Keccak256Commitment, blockId eth.BlockID) (eth.Data, error)
// AdvanceL1Origin advances the L1 origin to the given block number, syncing the DA challenge events.
AdvanceL1Origin(ctx context.Context, blockId eth.BlockID) error
AdvanceL1Origin(ctx context.Context, l1 plasma.L1Fetcher, blockId eth.BlockID) error
// Reset the challenge origin in case of L1 reorg
Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error
// Notify L1 finalized head so plasma finality is always behind L1
Expand Down Expand Up @@ -82,7 +84,7 @@ func (ds *DataSourceFactory) OpenData(ctx context.Context, ref eth.L1BlockRef, b
}
if ds.dsCfg.plasmaEnabled {
// plasma([calldata | blobdata](l1Ref)) -> data
return NewPlasmaDataSource(ds.log, src, ds.plasmaFetcher, ref.ID()), nil
return NewPlasmaDataSource(ds.log, src, ds.fetcher, ds.plasmaFetcher, ref.ID()), nil
}
return src, nil
}
Expand Down
10 changes: 6 additions & 4 deletions op-node/rollup/derive/plasma_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ type PlasmaDataSource struct {
log log.Logger
src DataIter
fetcher PlasmaInputFetcher
l1 L1TransactionFetcher
id eth.BlockID
// keep track of a pending commitment so we can keep trying to fetch the input.
comm plasma.Keccak256Commitment
}

func NewPlasmaDataSource(log log.Logger, src DataIter, fetcher PlasmaInputFetcher, id eth.BlockID) *PlasmaDataSource {
func NewPlasmaDataSource(log log.Logger, src DataIter, l1 L1TransactionFetcher, fetcher PlasmaInputFetcher, id eth.BlockID) *PlasmaDataSource {
return &PlasmaDataSource{
log: log,
src: src,
fetcher: fetcher,
l1: l1,
id: id,
}
}
Expand All @@ -35,7 +37,7 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) {
// before we can proceed to fetch the input data. This function can be called multiple times
// for the same origin and noop if the origin was already processed. It is also called if
// there is not commitment in the current origin.
if err := s.fetcher.AdvanceL1Origin(ctx, s.id); err != nil {
if err := s.fetcher.AdvanceL1Origin(ctx, s.l1, s.id); err != nil {
if errors.Is(err, plasma.ErrReorgRequired) {
return nil, NewResetError(fmt.Errorf("new expired challenge"))
}
Expand All @@ -58,15 +60,15 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) {
s.comm = comm
}
// use the commitment to fetch the input from the plasma DA provider.
data, err := s.fetcher.GetInput(ctx, s.comm, s.id)
data, err := s.fetcher.GetInput(ctx, s.l1, s.comm, s.id)
// GetInput may call for a reorg if the pipeline is stalled and the plasma DA manager
// continued syncing origins detached from the pipeline origin.
if errors.Is(err, plasma.ErrReorgRequired) {
// challenge for a new previously derived commitment expired.
return nil, NewResetError(err)
} else if errors.Is(err, plasma.ErrExpiredChallenge) {
// this commitment was challenged and the challenge expired.
s.log.Warn("challenge expired, skipping batch", "comm", fmt.Sprintf("%x", s.comm))
s.log.Warn("challenge expired, skipping batch", "comm", s.comm)
s.comm = nil
// skip the input
return s.Next(ctx)
Expand Down
6 changes: 3 additions & 3 deletions op-node/rollup/derive/plasma_data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestPlasmaDataSource(t *testing.T) {

daState := plasma.NewState(logger, metrics)

da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, l1F, metrics, daState)
da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, metrics, daState)

finalitySignal := &MockFinalitySignal{}
da.OnFinalizedHeadSignal(finalitySignal.OnFinalized)
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestPlasmaDataSourceStall(t *testing.T) {

daState := plasma.NewState(logger, metrics)

da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, l1F, metrics, daState)
da := plasma.NewPlasmaDAWithState(logger, pcfg, storage, metrics, daState)

finalitySignal := &MockFinalitySignal{}
da.OnFinalizedHeadSignal(finalitySignal.OnFinalized)
Expand Down Expand Up @@ -411,7 +411,7 @@ func TestPlasmaDataSourceInvalidData(t *testing.T) {
ChallengeWindow: 90, ResolveWindow: 90,
}

da := plasma.NewPlasmaDAWithStorage(logger, pcfg, storage, l1F, &plasma.NoopMetrics{})
da := plasma.NewPlasmaDAWithStorage(logger, pcfg, storage, &plasma.NoopMetrics{})

// Create rollup genesis and config
l1Time := uint64(2)
Expand Down
55 changes: 29 additions & 26 deletions op-plasma/damgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ type DA struct {
metrics Metricer

storage DAStorage
l1 L1Fetcher

// the DA state keeps track of all the commitments and their challenge status.
state *State

// the latest l1 block we synced challenge contract events from
Expand All @@ -74,35 +74,34 @@ type DA struct {
// the latest recorded finalized head as per the l1 finalization signal
l1FinalizedHead eth.L1BlockRef
// flag the reset function we are resetting because of an expired challenge
resetting int
resetting bool

finalizedHeadSignalFunc HeadSignalFn
}

// NewPlasmaDA creates a new PlasmaDA instance with the given log and CLIConfig.
func NewPlasmaDA(log log.Logger, cli CLIConfig, cfg Config, l1f L1Fetcher, metrics Metricer) *DA {
return NewPlasmaDAWithStorage(log, cfg, cli.NewDAClient(), l1f, metrics)
func NewPlasmaDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) *DA {
return NewPlasmaDAWithStorage(log, cfg, cli.NewDAClient(), metrics)
}

// NewPlasmaDAWithStorage creates a new PlasmaDA instance with the given log and DAStorage interface.
func NewPlasmaDAWithStorage(log log.Logger, cfg Config, storage DAStorage, l1f L1Fetcher, metrics Metricer) *DA {
func NewPlasmaDAWithStorage(log log.Logger, cfg Config, storage DAStorage, metrics Metricer) *DA {
return &DA{
log: log,
cfg: cfg,
storage: storage,
l1: l1f,
metrics: metrics,
state: NewState(log, metrics),
}
}

// NewPlasmaWithState creates a plasma storage from initial state used for testing in isolation.
func NewPlasmaDAWithState(log log.Logger, cfg Config, storage DAStorage, l1f L1Fetcher, metrics Metricer, state *State) *DA {
// We pass the L1Fetcher to each method so it is kept in sync with the conf depth of the pipeline.
func NewPlasmaDAWithState(log log.Logger, cfg Config, storage DAStorage, metrics Metricer, state *State) *DA {
return &DA{
log: log,
cfg: cfg,
storage: storage,
l1: l1f,
metrics: metrics,
state: state,
}
Expand Down Expand Up @@ -138,20 +137,24 @@ func (d *DA) Finalize(l1Finalized eth.L1BlockRef) {
// LookAhead increments the challenges origin and process the new block if it exists.
// It is used when the derivation pipeline stalls due to missing data and we need to continue
// syncing challenge events until the challenge is resolved or expires.
func (d *DA) LookAhead(ctx context.Context) error {
blkRef, err := d.l1.L1BlockRefByNumber(ctx, d.origin.Number+1)
func (d *DA) LookAhead(ctx context.Context, l1 L1Fetcher) error {
blkRef, err := l1.L1BlockRefByNumber(ctx, d.origin.Number+1)
// temporary error, will do a backoff
if err != nil {
return err
}
return d.AdvanceL1Origin(ctx, blkRef.ID())
return d.AdvanceL1Origin(ctx, l1, blkRef.ID())
}

// Reset the challenge event derivation origin in case of L1 reorg
func (d *DA) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error {
// resetting due to expired challenge, do not clear state
if d.resetting > 0 {
d.resetting--
// resetting due to expired challenge, do not clear state.
// If the DA source returns ErrReset, the pipeline is forced to reset by the rollup driver.
// In that case the Reset function will be called immediately, BEFORE the pipeline can
// call any further stage to step. Thus the state will NOT be cleared if the reset originates
// from this stage of the pipeline.
if d.resetting {
d.resetting = false
} else {
// resetting due to L1 reorg, clear state
d.origin = base.ID()
Expand All @@ -162,7 +165,7 @@ func (d *DA) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemC

// GetInput returns the input data for the given commitment bytes. blockNumber is required to lookup
// the challenge status in the DataAvailabilityChallenge L1 contract.
func (d *DA) GetInput(ctx context.Context, comm Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) {
func (d *DA) GetInput(ctx context.Context, l1 L1Fetcher, comm Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) {
// If the challenge head is ahead in the case of a pipeline reset or stall, we might have synced a
// challenge event for this commitment. Otherwise we mark the commitment as part of the canonical
// chain so potential future challenge events can be selected.
Expand All @@ -189,7 +192,7 @@ func (d *DA) GetInput(ctx context.Context, comm Keccak256Commitment, blockId eth
} else if notFound {
// data is missing and a challenge is active, we must wait for the challenge to resolve
// hence we continue syncing new origins to sync the new challenge events.
if err := d.LookAhead(ctx); err != nil {
if err := d.LookAhead(ctx, l1); err != nil {
return nil, err
}
return nil, ErrPendingChallenge
Expand All @@ -215,7 +218,7 @@ func (d *DA) GetInput(ctx context.Context, comm Keccak256Commitment, blockId eth
return nil, ErrMissingPastWindow
} else {
// continue syncing challenges hoping it eventually is challenged and resolved
if err := d.LookAhead(ctx); err != nil {
if err := d.LookAhead(ctx, l1); err != nil {
return nil, err
}
return nil, ErrPendingChallenge
Expand All @@ -235,27 +238,27 @@ func (d *DA) isExpired(bn uint64) bool {
// after the new resolveWindow, computes and signals the new finalized head and sets the l1 block
// as the new head for tracking challenges. If forwards an error if any new challenge have expired to
// trigger a derivation reset.
func (d *DA) AdvanceL1Origin(ctx context.Context, block eth.BlockID) error {
func (d *DA) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error {
// do not repeat for the same origin
if block.Number <= d.origin.Number {
return nil
}
// sync challenges for the given block ID
if err := d.LoadChallengeEvents(ctx, block); err != nil {
if err := d.LoadChallengeEvents(ctx, l1, block); err != nil {
return err
}
// advance challenge window, computing the finalized head
bn, err := d.state.ExpireChallenges(block.Number)
if err != nil {
// warn the reset function not to clear the state
d.resetting++
d.resetting = true
return err
}

// finalized head signal is called only when the finalized head number increases
// and the l1 finalized head ahead of the DA finalized head.
if bn > d.finalizedHead.Number {
ref, err := d.l1.L1BlockRefByNumber(ctx, bn)
ref, err := l1.L1BlockRefByNumber(ctx, bn)
if err != nil {
return err
}
Expand All @@ -273,9 +276,9 @@ func (d *DA) AdvanceL1Origin(ctx context.Context, block eth.BlockID) error {
}

// LoadChallengeEvents fetches the l1 block receipts and updates the challenge status
func (d *DA) LoadChallengeEvents(ctx context.Context, block eth.BlockID) error {
func (d *DA) LoadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error {
// filter any challenge event logs in the block
logs, err := d.fetchChallengeLogs(ctx, block)
logs, err := d.fetchChallengeLogs(ctx, l1, block)
if err != nil {
return err
}
Expand All @@ -290,7 +293,7 @@ func (d *DA) LoadChallengeEvents(ctx context.Context, block eth.BlockID) error {
switch status {
case ChallengeResolved:
// cached with input resolution call so not expensive
_, txs, err := d.l1.InfoAndTxsByHash(ctx, block.Hash)
_, txs, err := l1.InfoAndTxsByHash(ctx, block.Hash)
if err != nil {
d.log.Error("failed to fetch l1 block", "block", block.Number, "err", err)
continue
Expand Down Expand Up @@ -330,9 +333,9 @@ func (d *DA) LoadChallengeEvents(ctx context.Context, block eth.BlockID) error {
}

// fetchChallengeLogs returns logs for challenge events if any for the given block
func (d *DA) fetchChallengeLogs(ctx context.Context, block eth.BlockID) ([]*types.Log, error) { //cached with deposits events call so not expensive
func (d *DA) fetchChallengeLogs(ctx context.Context, l1 L1Fetcher, block eth.BlockID) ([]*types.Log, error) { //cached with deposits events call so not expensive
var logs []*types.Log
_, receipts, err := d.l1.FetchReceipts(ctx, block.Hash)
_, receipts, err := l1.FetchReceipts(ctx, block.Hash)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions op-plasma/damgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestFilterInvalidBlockNumber(t *testing.T) {
bn := uint64(19)
bhash := common.HexToHash("0xd438144ffab918b1349e7cd06889c26800c26d8edc34d64f750e3e097166a09c")

da := NewPlasmaDAWithStorage(logger, pcfg, storage, l1F, &NoopMetrics{})
da := NewPlasmaDAWithStorage(logger, pcfg, storage, &NoopMetrics{})

receipts := types.Receipts{&types.Receipt{
Type: 2,
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestFilterInvalidBlockNumber(t *testing.T) {
l1F.ExpectFetchReceipts(bhash, nil, receipts, nil)

// we get 1 logs successfully filtered as valid status updated contract event
logs, err := da.fetchChallengeLogs(ctx, id)
logs, err := da.fetchChallengeLogs(ctx, l1F, id)
require.NoError(t, err)
require.Equal(t, len(logs), 1)

Expand Down
4 changes: 2 additions & 2 deletions op-plasma/damock.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var ErrNotEnabled = errors.New("plasma not enabled")
// PlasmaDisabled is a noop plasma DA implementation for stubbing.
type PlasmaDisabled struct{}

func (d *PlasmaDisabled) GetInput(ctx context.Context, commitment Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) {
func (d *PlasmaDisabled) GetInput(ctx context.Context, l1 L1Fetcher, commitment Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) {
return nil, ErrNotEnabled
}

Expand All @@ -94,6 +94,6 @@ func (d *PlasmaDisabled) Finalize(ref eth.L1BlockRef) {
func (d *PlasmaDisabled) OnFinalizedHeadSignal(f HeadSignalFn) {
}

func (d *PlasmaDisabled) AdvanceL1Origin(ctx context.Context, blockId eth.BlockID) error {
func (d *PlasmaDisabled) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, blockId eth.BlockID) error {
return ErrNotEnabled
}
19 changes: 9 additions & 10 deletions op-plasma/dastate.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,15 @@ func (s *State) Prune(bn uint64) {
} else {
bn = 0
}
for i := 0; i < len(s.comms); i++ {
c := s.comms[i]
if c.blockNumber < bn {
s.log.Debug("prune commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber)
delete(s.commsByKey, string(c.key))
} else {
// once we're past the given index, remove all commitments
s.comms = s.comms[i:]
break
}
if s.comms.Len() == 0 {
return
}
// only first element is the highest priority (lowest block number).
// next highest priority is swapped to the first position after a Pop.
for s.comms[0].blockNumber < bn {
c := heap.Pop(&s.comms).(*Commitment)
s.log.Debug("prune commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber)
delete(s.commsByKey, string(c.key))
}
}

Expand Down

0 comments on commit 73ce442

Please sign in to comment.