Skip to content

Commit

Permalink
eth, les, light: Refactor downloader to use blockchain interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Arachnid committed Jun 28, 2017
1 parent dfd0762 commit 0550957
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 146 deletions.
143 changes: 82 additions & 61 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,11 @@ type Downloader struct {
syncStatsState stateSyncStats
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields

lightchain LightChain
chain BlockChain

// Callbacks
hasHeader headerCheckFn // Checks if a header is present in the chain
hasBlockAndState blockAndStateCheckFn // Checks if a block and associated state is present in the chain
getHeader headerRetrievalFn // Retrieves a header from the chain
getBlock blockRetrievalFn // Retrieves a block from the chain
headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
headBlock headBlockRetrievalFn // Retrieves the head block from the chain
headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head
getTd tdRetrievalFn // Retrieves the TD of a block from the chain
insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
rollback chainRollbackFn // Removes a batch of recently added chain links
dropPeer peerDropFn // Drops a peer for misbehaving
dropPeer peerDropFn // Drops a peer for misbehaving

// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
Expand Down Expand Up @@ -163,11 +153,56 @@ type Downloader struct {
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}

type LightChain interface {
// HasHeader verifies a header's presence in the local chain.
HasHeader(common.Hash) bool

// GetHeaderByHash retrieves a header from the local chain.
GetHeaderByHash(common.Hash) *types.Header

// CurrentHeader retrieves the head header from the local chain.
CurrentHeader() *types.Header

// GetTdByHash returns the total difficulty of a local block.
GetTdByHash(common.Hash) *big.Int

// InsertHeaderChain inserts a batch of headers into the local chain.
InsertHeaderChain([]*types.Header, int) (int, error)

// Rollback removes a few recently added elements from the local chain.
Rollback([]common.Hash)
}

type BlockChain interface {
LightChain

// HasBlockAndState verifies block and associated states' presence in the local chain.
HasBlockAndState(common.Hash) bool

// GetBlockByHash retrieves a block from the local chain.
GetBlockByHash(common.Hash) *types.Block

// CurrentBlock retrieves the head block from the local chain.
CurrentBlock() *types.Block

// CurrentFastBlock retrieves the head fast block from the local chain.
CurrentFastBlock() *types.Block

// FastSyncCommitHead directly commits the head block to a certain entity.
FastSyncCommitHead(common.Hash) error

// InsertChain inserts a batch of blocks into the local chain.
InsertChain(types.Blocks) (int, error)

// InsertReceiptChain inserts a batch of receipts into the local chain.
InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn,
getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn,
headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
if lightchain == nil {
lightchain = chain
}

dl := &Downloader{
mode: mode,
Expand All @@ -177,19 +212,8 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he
stateDB: stateDb,
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
hasHeader: hasHeader,
hasBlockAndState: hasBlockAndState,
getHeader: getHeader,
getBlock: getBlock,
headHeader: headHeader,
headBlock: headBlock,
headFastBlock: headFastBlock,
commitHeadBlock: commitHeadBlock,
getTd: getTd,
insertHeaders: insertHeaders,
insertBlocks: insertBlocks,
insertReceipts: insertReceipts,
rollback: rollback,
chain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
Expand Down Expand Up @@ -223,11 +247,11 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
current := uint64(0)
switch d.mode {
case FullSync:
current = d.headBlock().NumberU64()
current = d.chain.CurrentBlock().NumberU64()
case FastSync:
current = d.headFastBlock().NumberU64()
current = d.chain.CurrentFastBlock().NumberU64()
case LightSync:
current = d.headHeader().Number.Uint64()
current = d.lightchain.CurrentHeader().Number.Uint64()
}
return ethereum.SyncProgress{
StartingBlock: d.syncStatsChainOrigin,
Expand Down Expand Up @@ -572,13 +596,13 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
// the head links match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Figure out the valid ancestor range to prevent rewrite attacks
floor, ceil := int64(-1), d.headHeader().Number.Uint64()
floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64()

p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)
if d.mode == FullSync {
ceil = d.headBlock().NumberU64()
ceil = d.chain.CurrentBlock().NumberU64()
} else if d.mode == FastSync {
ceil = d.headFastBlock().NumberU64()
ceil = d.chain.CurrentFastBlock().NumberU64()
}
if ceil >= MaxForkAncestry {
floor = int64(ceil - MaxForkAncestry)
Expand Down Expand Up @@ -638,7 +662,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
continue
}
// Otherwise check if we already know the header or not
if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
if (d.mode == FullSync && d.chain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()

// If every header is known, even future ones, the peer straight out lied about its head
Expand Down Expand Up @@ -703,11 +727,11 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
arrived = true

// Modify the search interval based on the response
if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) {
if (d.mode == FullSync && !d.chain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash())) {
end = check
break
}
header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
header := d.lightchain.GetHeaderByHash(headers[0].Hash()) // Independent of sync mode, header surely exists
if header.Number.Uint64() != check {
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
return 0, errBadPeer
Expand Down Expand Up @@ -1124,23 +1148,19 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
for i, header := range rollback {
hashes[i] = header.Hash()
}
lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, common.Big0, common.Big0
if d.headFastBlock != nil {
lastFastBlock = d.headFastBlock().Number()
}
if d.headBlock != nil {
lastBlock = d.headBlock().Number()
lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
if d.mode != LightSync {
lastFastBlock = d.chain.CurrentFastBlock().Number()
lastBlock = d.chain.CurrentBlock().Number()
}
d.rollback(hashes)
d.lightchain.Rollback(hashes)
curFastBlock, curBlock := common.Big0, common.Big0
if d.headFastBlock != nil {
curFastBlock = d.headFastBlock().Number()
}
if d.headBlock != nil {
curBlock = d.headBlock().Number()
if d.mode != LightSync {
curFastBlock = d.chain.CurrentFastBlock().Number()
curBlock = d.chain.CurrentBlock().Number()
}
log.Warn("Rolled back headers", "count", len(hashes),
"header", fmt.Sprintf("%d->%d", lastHeader, d.headHeader().Number),
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))

Expand Down Expand Up @@ -1190,7 +1210,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if d.mode != LightSync {
if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
if !gotHeaders && td.Cmp(d.chain.GetTdByHash(d.chain.CurrentBlock().Hash())) > 0 {
return errStallingPeer
}
}
Expand All @@ -1202,7 +1222,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if d.mode == FastSync || d.mode == LightSync {
if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
if td.Cmp(d.lightchain.GetTdByHash(d.lightchain.CurrentHeader().Hash())) > 0 {
return errStallingPeer
}
}
Expand Down Expand Up @@ -1232,7 +1252,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Collect the yet unknown headers to mark them as uncertain
unknown := make([]*types.Header, 0, len(headers))
for _, header := range chunk {
if !d.hasHeader(header.Hash()) {
if !d.lightchain.HasHeader(header.Hash()) {
unknown = append(unknown, header)
}
}
Expand All @@ -1241,7 +1261,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
}
if n, err := d.insertHeaders(chunk, frequency); err != nil {
if n, err := d.chain.InsertHeaderChain(chunk, frequency); err != nil {
// If some headers were inserted, add them too to the rollback list
if n > 0 {
rollback = append(rollback, chunk[:n]...)
Expand Down Expand Up @@ -1328,7 +1348,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
for i, result := range results[:items] {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
}
if index, err := d.insertBlocks(blocks); err != nil {
if index, err := d.chain.InsertChain(blocks); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
}
Expand Down Expand Up @@ -1368,6 +1388,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
stateSync.Cancel()
if err := d.commitPivotBlock(P); err != nil {
return err

}
}
if err := d.importBlockResults(afterP); err != nil {
Expand Down Expand Up @@ -1416,7 +1437,7 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
receipts[i] = result.Receipts
}
if index, err := d.insertReceipts(blocks, receipts); err != nil {
if index, err := d.chain.InsertReceiptChain(blocks, receipts); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
}
Expand All @@ -1434,10 +1455,10 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
return err
}
log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash())
if _, err := d.insertReceipts([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
if _, err := d.chain.InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
return err
}
return d.commitHeadBlock(b.Hash())
return d.chain.FastSyncCommitHead(b.Hash())
}

// DeliverHeaders injects a new batch of block headers received from a remote
Expand Down
Loading

0 comments on commit 0550957

Please sign in to comment.