Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sequencer auto recover when meet an unexpected shutdown #166

Merged
merged 14 commits into from
Nov 13, 2024
Merged
53 changes: 53 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2204,6 +2204,10 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
return 0, nil
}

func (bc *BlockChain) RecoverStateAndSetHead(block *types.Block) (common.Hash, error) {
return bc.recoverStateAndSetHead(block)
}

// recoverAncestors finds the closest ancestor with available state and re-execute
// all the ancestor blocks since that.
// recoverAncestors is only used post-merge.
Expand Down Expand Up @@ -2709,6 +2713,55 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header) (int, error) {
return 0, err
}

// recoverStateAndSetHead attempts to recover the state of the blockchain by re-importing
// missing blocks and advancing the chain head. It ensures the state is available
// for the given block and its ancestors before updating the head.
func (bc *BlockChain) recoverStateAndSetHead(block *types.Block) (common.Hash, error) {
var (
hashes []common.Hash
numbers []uint64
parent = block
)
for parent != nil && !bc.HasState(parent.Root()) {
if bc.stateRecoverable(parent.Root()) {
if err := bc.triedb.Recover(parent.Root()); err != nil {
return common.Hash{}, err
}
break
}
hashes = append(hashes, parent.Hash())
numbers = append(numbers, parent.NumberU64())
parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)

// If the chain is terminating, stop iteration
if bc.insertStopped() {
log.Debug("Abort during blocks iteration")
return common.Hash{}, errInsertionInterrupted
}
}
if parent == nil {
return common.Hash{}, errors.New("missing parent")
}
// Import all the pruned blocks to make the state available
for i := len(hashes) - 1; i >= 0; i-- {
// If the chain is terminating, stop processing blocks
if bc.insertStopped() {
log.Debug("Abort during blocks processing")
return common.Hash{}, errInsertionInterrupted
}
var b *types.Block
if i == 0 {
b = block
} else {
b = bc.GetBlock(hashes[i], numbers[i])
}
if _, err := bc.insertChain(types.Blocks{b}, true); err != nil {
return b.ParentHash(), err
}
}
return block.Hash(), nil
}

// SetBlockValidatorAndProcessorForTesting sets the current validator and processor.
// This method can be used to force an invalid blockchain to be verified for tests.
// This method is unsafe and should only be used before block import starts.
Expand Down
6 changes: 6 additions & 0 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,12 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
// If we already are busy generating this work, then we do not need
// to start a second process.
if api.localBlocks.has(id) {
payload := api.localBlocks.getWithoutStatus(id)
if payload != nil {
// fix is running, listening the status of fix routine
log.Info("step into listenFix")
api.eth.Miner().Worker().ListenFixCompletion(id, payload, args)
}
return valid(&id), nil
}
payload, err := api.eth.Miner().BuildPayload(args)
Expand Down
18 changes: 18 additions & 0 deletions eth/catalyst/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/miner"
)

Expand Down Expand Up @@ -92,6 +93,23 @@ func (q *payloadQueue) get(id engine.PayloadID, full bool) *engine.ExecutionPayl
return nil
}

// getWithoutStatus retrieves a previously stored payload item or nil if it does not exist.
func (q *payloadQueue) getWithoutStatus(id engine.PayloadID) *miner.Payload {
q.lock.RLock()
defer q.lock.RUnlock()

for _, item := range q.payloads {
if item == nil {
log.Info("getting payload not found", "id", id)
return nil // no more items
}
if item.id == id {
return item.payload
}
}
return nil
}

// waitFull waits until the first full payload has been built for the specified payload id
// The method returns immediately if the payload is unknown.
func (q *payloadQueue) waitFull(id engine.PayloadID) error {
Expand Down
4 changes: 4 additions & 0 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1846,3 +1846,7 @@ func (d *Downloader) reportSnapSyncProgress(force bool) {
log.Info("Syncing: chain download in progress", "synced", progress, "chain", syncedBytes, "headers", headers, "bodies", bodies, "receipts", receipts, "eta", common.PrettyDuration(eta))
d.syncLogTime = time.Now()
}

func (d *Downloader) GetAllPeers() []*peerConnection {
return d.peers.AllPeers()
}
14 changes: 14 additions & 0 deletions eth/downloader/fetchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package downloader

import (
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -113,3 +114,16 @@ func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amou
return *res.Res.(*eth.BlockHeadersRequest), res.Meta.([]common.Hash), nil
}
}

func (d *Downloader) GetHeaderByHashFromPeer(peer *peerConnection, blockHash common.Hash) (*types.Header, error) {
headers, _, err := d.fetchHeadersByHash(peer, blockHash, 1, 0, false)
if err != nil {
return nil, fmt.Errorf("failed to fetch header from peer: %v", err)
}

if len(headers) == 0 {
return nil, fmt.Errorf("no headers returned for hash: %v", blockHash)
}

return headers[0], nil
}
2 changes: 1 addition & 1 deletion eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
if !cs.handler.chain.NoTries() && !cs.handler.chain.HasState(head.Root) {
block := cs.handler.chain.CurrentSnapBlock()
td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64())
log.Info("Reenabled snap sync as chain is stateless")
log.Info("Reenabled snap sync as chain is stateless", "lost block", block.Number.Uint64())
return downloader.SnapSync, td
}
// Nope, we're really full syncing
Expand Down
147 changes: 147 additions & 0 deletions miner/fix_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package miner

import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/log"
)

// FixManager manages the fix operation state and notification mechanism.
type FixManager struct {
bnoieh marked this conversation as resolved.
Show resolved Hide resolved
mutex sync.Mutex // Protects access to fix state
isFixInProgress bool // Tracks if a fix operation is in progress
fixChannels sync.Map // Stores fix state and notification channels
listenerStarted sync.Map // Tracks whether a listener goroutine has started for each payload ID
downloader *downloader.Downloader // Used to trigger BeaconSync operations

}

// FixResult holds the result of the fix operation
type FixResult struct {
Success bool
Err error
}

// NewFixManager initializes a FixManager with required dependencies
func NewFixManager(downloader *downloader.Downloader) *FixManager {
return &FixManager{
downloader: downloader,
}
}

// StartFix launches a goroutine to manage the fix process and tracks the fix state.
func (fm *FixManager) StartFix(worker *worker, id engine.PayloadID, parentHash common.Hash) {
fm.mutex.Lock()
defer fm.mutex.Unlock()

if !fm.isFixInProgress {
fm.isFixInProgress = true
resultChan := make(chan FixResult, 1) // Channel to capture fix result (success or error)
fm.fixChannels.Store(id, resultChan)

go func() {
defer func() {
fm.mutex.Lock()
fm.isFixInProgress = false
fm.mutex.Unlock()

if ch, ok := fm.fixChannels.Load(id); ok {
resultChan := ch.(chan FixResult)
close(resultChan)
}
}()
worker.fix(parentHash, resultChan) // processing fix logic
}()
}
}

// ListenFixCompletion listens for the completion of the fix process to avoid redundant goroutine starts.
//
// payload: The payload that will be updated after fix completion.
// args: The arguments required to retry the payload update.
func (fm *FixManager) ListenFixCompletion(worker *worker, id engine.PayloadID, payload *Payload, args *BuildPayloadArgs) {
ch, exists := fm.fixChannels.Load(id)
if !exists {
log.Info("Payload is not fixing or has been completed")
return
}

// Check if a listener goroutine has already been started
if _, listenerExists := fm.listenerStarted.LoadOrStore(id, true); listenerExists {
log.Info("Listener already started for payload", "payload", id)
return
}

go func() {
log.Info("Start waiting for fix completion")
result := <-ch.(chan FixResult) // Wait for the fix result

// Check the result and decide whether to retry the payload update
if result.Success {
if err := worker.retryPayloadUpdate(args, payload); err != nil {
log.Error("Failed to retry payload update after fix", "id", id, "err", err)
} else {
log.Info("Payload update after fix succeeded", "id", id)
}
} else {
log.Error("Fix failed, skipping payload update", "id", id, "err", result.Err)
}

// Clean up the fix state
fm.fixChannels.Delete(id)
fm.listenerStarted.Delete(id)
}()
}

// RecoverFromLocal attempts to recover the block and MPT data from the local chain.
//
// blockHash: The latest header(unsafe block) hash of the block to recover.
func (fm *FixManager) RecoverFromLocal(w *worker, blockHash common.Hash) error {
block := w.chain.GetBlockByHash(blockHash)
if block == nil {
return fmt.Errorf("block not found in local chain")
}

log.Info("Fixing data for block", "block number", block.NumberU64())
latestValid, err := w.chain.RecoverStateAndSetHead(block)
if err != nil {
return fmt.Errorf("failed to recover state: %v", err)
}

log.Info("Recovered states up to block", "latestValid", latestValid)
return nil
}

// RecoverFromPeer attempts to retrieve the block header from peers and triggers BeaconSync if successful.
//
// blockHash: The latest header(unsafe block) hash of the block to recover.
func (fm *FixManager) RecoverFromPeer(blockHash common.Hash) error {
peers := fm.downloader.GetAllPeers()
if len(peers) == 0 {
return fmt.Errorf("no peers available")
}

var header *types.Header
var err error
for _, peer := range peers {
header, err = fm.downloader.GetHeaderByHashFromPeer(peer, blockHash)
if err == nil && header != nil {
break
}
log.Warn("Failed to retrieve header from peer", "err", err)
}

if header == nil {
return fmt.Errorf("failed to retrieve header from all valid peers")
}

log.Info("Successfully retrieved header from peer", "blockHash", blockHash)

fm.downloader.BeaconSync(downloader.FullSync, header, nil)
return nil
}
11 changes: 9 additions & 2 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/consensus/misc/eip1559"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/consensus/misc/eip1559"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
Expand Down Expand Up @@ -86,6 +87,7 @@ var DefaultMevConfig = MevConfig{
type Backend interface {
BlockChain() *core.BlockChain
TxPool() *txpool.TxPool
Downloader() *downloader.Downloader
}

type BackendWithHistoricalState interface {
Expand Down Expand Up @@ -300,6 +302,11 @@ func (miner *Miner) BuildPayload(args *BuildPayloadArgs) (*Payload, error) {
return miner.worker.buildPayload(args)
}

// Worker builds the payload according to the provided parameters.
func (miner *Miner) Worker() *worker {
return miner.worker
}

func (miner *Miner) SimulateBundle(bundle *types.Bundle) (*big.Int, error) {

env, err := miner.prepareSimulationEnv()
Expand Down
4 changes: 4 additions & 0 deletions miner/miner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (m *mockBackend) TxPool() *txpool.TxPool {
return m.txPool
}

func (m *mockBackend) Downloader() *downloader.Downloader {
return nil
}

func (m *mockBackend) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) {
return nil, errors.New("not supported")
}
Expand Down
Loading
Loading