From b23eb932189fd236e497208f2f91b1083815bf84 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 6 Nov 2024 07:12:31 +0800 Subject: [PATCH] enable CCC in L1 follower node when `--ccc` flag is set to generate row consumption --- core/blockchain.go | 25 ++++++++++++++++--------- eth/backend.go | 3 +++ rollup/da_syncer/da_syncer.go | 23 ++++++++++++++++++----- rollup/da_syncer/syncing_pipeline.go | 5 ++++- 4 files changed, 41 insertions(+), 15 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 62b23e0b18ad..10a32fa5d9b3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2025,15 +2025,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return it.index, err } -func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions, sign bool) (WriteStatus, error) { +func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions, sign bool) (*types.Block, WriteStatus, error) { if !bc.chainmu.TryLock() { - return NonStatTy, errInsertionInterrupted + return nil, NonStatTy, errInsertionInterrupted } defer bc.chainmu.Unlock() statedb, err := state.New(parentBlock.Root(), bc.stateCache, bc.snaps) if err != nil { - return NonStatTy, err + return nil, NonStatTy, err } statedb.StartPrefetcher("l1sync") @@ -2044,7 +2044,7 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types tempBlock := types.NewBlockWithHeader(header).WithBody(txs, nil) receipts, logs, gasUsed, err := bc.processor.Process(tempBlock, statedb, bc.vmConfig) if err != nil { - return NonStatTy, fmt.Errorf("error processing block %d: %w", header.Number.Uint64(), err) + return nil, NonStatTy, fmt.Errorf("error processing block %d: %w", header.Number.Uint64(), err) } // TODO: once we have the extra and difficulty we need to verify the signature of the block with Clique @@ -2056,7 +2056,7 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types err = bc.engine.Prepare(bc, header) if err != nil { - return NonStatTy, fmt.Errorf("error preparing block %d: %w", tempBlock.Number().Uint64(), err) + return nil, NonStatTy, fmt.Errorf("error preparing block %d: %w", tempBlock.Number().Uint64(), err) } // we want to re-sign the block: set time to original value again. @@ -2073,19 +2073,19 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types if sign { resultCh, stopCh := make(chan *types.Block), make(chan struct{}) if err = bc.engine.Seal(bc, fullBlock, resultCh, stopCh); err != nil { - return NonStatTy, fmt.Errorf("error sealing block %d: %w", fullBlock.Number().Uint64(), err) + return nil, NonStatTy, fmt.Errorf("error sealing block %d: %w", fullBlock.Number().Uint64(), err) } // Clique.Seal() will only wait for a second before giving up on us. So make sure there is nothing computational heavy // or a call that blocks between the call to Seal and the line below. Seal might introduce some delay, so we keep track of // that artificially added delay and subtract it from overall runtime of commit(). fullBlock = <-resultCh if fullBlock == nil { - return NonStatTy, fmt.Errorf("sealing block failed %d: block is nil", header.Number.Uint64()) + return nil, NonStatTy, fmt.Errorf("sealing block failed %d: block is nil", header.Number.Uint64()) } // verify the generated block with local consensus engine to make sure everything is as expected if err = bc.engine.VerifyHeader(bc, fullBlock.Header()); err != nil { - return NonStatTy, fmt.Errorf("error verifying signed block %d: %w", fullBlock.Number().Uint64(), err) + return nil, NonStatTy, fmt.Errorf("error verifying signed block %d: %w", fullBlock.Number().Uint64(), err) } } @@ -2105,7 +2105,14 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types l.BlockHash = blockHash } - return bc.writeBlockAndSetHead(fullBlock, receipts, logs, statedb, false) + // Double check: even though we just built the block, make sure it is valid. + if err = bc.validator.ValidateState(fullBlock, statedb, receipts, gasUsed); err != nil { + bc.reportBlock(fullBlock, receipts, err) + return nil, NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err) + } + + writeStatus, err := bc.writeBlockAndSetHead(fullBlock, receipts, logs, statedb, false) + return fullBlock, writeStatus, err } // insertSideChain is called when an import batch hits upon a pruned ancestor diff --git a/eth/backend.go b/eth/backend.go index 71c1d0ebe822..ee27e064951a 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -251,6 +251,9 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl // simply let them run simultaneously. If messages are missing in DA syncing, it will be handled by the syncing pipeline // by waiting and retrying. if config.EnableDASyncing { + // Enable CCC if flag is set so that row consumption can be generated. + config.DA.CCCEnable = config.CheckCircuitCapacity + config.DA.CCCNumWorkers = config.CCCMaxWorkers eth.syncingPipeline, err = da_syncer.NewSyncingPipeline(context.Background(), eth.blockchain, chainConfig, eth.chainDb, l1Client, stack.Config().L1DeploymentBlock, config.DA) if err != nil { return nil, fmt.Errorf("cannot initialize da syncer: %w", err) diff --git a/rollup/da_syncer/da_syncer.go b/rollup/da_syncer/da_syncer.go index 602c6b04c400..825129292bd7 100644 --- a/rollup/da_syncer/da_syncer.go +++ b/rollup/da_syncer/da_syncer.go @@ -5,6 +5,7 @@ import ( "github.com/scroll-tech/go-ethereum/core" "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/rollup/ccc" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" ) @@ -15,15 +16,22 @@ var ( ) type DASyncer struct { - l2EndBlock uint64 - blockchain *core.BlockChain + asyncChecker *ccc.AsyncChecker + l2EndBlock uint64 + blockchain *core.BlockChain } -func NewDASyncer(blockchain *core.BlockChain, l2EndBlock uint64) *DASyncer { - return &DASyncer{ +func NewDASyncer(blockchain *core.BlockChain, cccEnable bool, cccNumWorkers int, l2EndBlock uint64) *DASyncer { + s := &DASyncer{ l2EndBlock: l2EndBlock, blockchain: blockchain, } + + if cccEnable { + s.asyncChecker = ccc.NewAsyncChecker(blockchain, cccNumWorkers, false) + } + + return s } // SyncOneBlock receives a PartialBlock, makes sure it's the next block in the chain, executes it and inserts it to the blockchain. @@ -50,10 +58,15 @@ func (s *DASyncer) SyncOneBlock(block *da.PartialBlock, override bool, sign bool return fmt.Errorf("failed getting parent block, number: %d", parentBlockNumber) } - if _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions, sign); err != nil { + fullBlock, _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions, sign) + if err != nil { return fmt.Errorf("failed building and writing block, number: %d, error: %v", block.PartialHeader.Number, err) } + if s.asyncChecker != nil { + _ = s.asyncChecker.Check(fullBlock) + } + currentBlock = s.blockchain.CurrentBlock() if override && block.PartialHeader.Number != currentBlock.Number.Uint64() && block.PartialHeader.Number%100 == 0 { newBlock := s.blockchain.GetHeaderByNumber(block.PartialHeader.Number) diff --git a/rollup/da_syncer/syncing_pipeline.go b/rollup/da_syncer/syncing_pipeline.go index 34f6e46f1ffc..8135d1ba6cb3 100644 --- a/rollup/da_syncer/syncing_pipeline.go +++ b/rollup/da_syncer/syncing_pipeline.go @@ -27,6 +27,9 @@ type Config struct { BlockNativeAPIEndpoint string // BlockNative blob api endpoint BeaconNodeAPIEndpoint string // Beacon node api endpoint + CCCEnable bool // enable CCC verification and generation of row consumption + CCCNumWorkers int // number of workers for CCC verification + RecoveryMode bool // Recovery mode is used to override existing blocks with the blocks read from the pipeline and start from a specific L1 block and batch InitialL1Block uint64 // L1 block in which the InitialBatch was committed (or any earlier L1 block but requires more RPC requests) InitialBatch uint64 // Batch number from which to start syncing and overriding blocks @@ -99,7 +102,7 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi daQueue := NewDAQueue(initialL1Block, config.InitialBatch, dataSourceFactory) batchQueue := NewBatchQueue(daQueue, db) blockQueue := NewBlockQueue(batchQueue) - daSyncer := NewDASyncer(blockchain, config.L2EndBlock) + daSyncer := NewDASyncer(blockchain, config.CCCEnable, config.CCCNumWorkers, config.L2EndBlock) ctx, cancel := context.WithCancel(ctx) return &SyncingPipeline{