Skip to content

Commit

Permalink
extra
Browse files Browse the repository at this point in the history
  • Loading branch information
kcalvinalvin committed Apr 1, 2024
1 parent 91cdf0d commit 1c1e323
Show file tree
Hide file tree
Showing 3 changed files with 361 additions and 23 deletions.
3 changes: 3 additions & 0 deletions netsync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/btcsuite/btcd/mempool"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/query"

Check failure on line 15 in netsync/interface.go

View workflow job for this annotation

GitHub Actions / Build

no required module provides package github.com/lightninglabs/neutrino/query; to add it:

Check failure on line 15 in netsync/interface.go

View workflow job for this annotation

GitHub Actions / Unit race

no required module provides package github.com/lightninglabs/neutrino/query; to add it:

Check failure on line 15 in netsync/interface.go

View workflow job for this annotation

GitHub Actions / Unit coverage

no required module provides package github.com/lightninglabs/neutrino/query; to add it:
)

// PeerNotifier exposes methods to notify peers of status changes to
Expand All @@ -38,4 +39,6 @@ type Config struct {
MaxPeers int

FeeEstimator *mempool.FeeEstimator

ConnectedPeers func() (<-chan query.Peer, func(), error)
}
231 changes: 215 additions & 16 deletions netsync/manager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2013-2017 The btcsuite developers
// Copyright (c) 2013-2017 The btcsuite developrs
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand All @@ -20,6 +20,7 @@ import (
"github.com/btcsuite/btcd/mempool"
peerpkg "github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/query"
)

const (
Expand Down Expand Up @@ -173,6 +174,57 @@ func limitAdd(m map[chainhash.Hash]struct{}, hash chainhash.Hash, limit int) {
m[hash] = struct{}{}
}

type checkpointedBlocksQuery struct {
syncMgr *SyncManager
msgs []wire.Message
msgBlockChan chan *wire.MsgBlock
}

func (c *checkpointedBlocksQuery) handleResponse(req, resp wire.Message,
peerAddr string) query.Progress {

r, ok := resp.(*wire.MsgBlock)
if !ok {
// We are only looking for block messages.
return query.Progress{
Finished: false,
Progressed: false,
}
}

//log.Infof("received block %s", r.BlockHash().String())

select {
case c.msgBlockChan <- r:
//state := c.syncMgr.peerStates[peerAddr]
//log.Infof("got block %v from peer %v", r.BlockHash().String(), peerAddr)
case <-c.syncMgr.quit:
return query.Progress{
Finished: false,
Progressed: false,
}
}

//log.Infof("send query finish")
return query.Progress{
Finished: true,
Progressed: true,
}
}

// requests
func (c *checkpointedBlocksQuery) requests() []*query.Request {
reqs := make([]*query.Request, len(c.msgs))
for idx, m := range c.msgs {
reqs[idx] = &query.Request{
Req: m,
HandleResp: c.handleResponse,
}
}

return reqs
}

// SyncManager is used to communicate block related messages with peers. The
// SyncManager is started as by executing Start() in a goroutine. Once started,
// it selects peers to sync from and starts the initial block download. Once the
Expand All @@ -190,6 +242,9 @@ type SyncManager struct {
wg sync.WaitGroup
quit chan struct{}

fetchManager query.WorkManager
queuedBlocks map[chainhash.Hash]*blockMsg

// These fields should only be accessed from the blockHandler thread
rejectedTxns map[chainhash.Hash]struct{}
requestedTxns map[chainhash.Hash]struct{}
Expand Down Expand Up @@ -376,6 +431,9 @@ func (sm *SyncManager) startSync() {
// event the progress time hasn't been updated recently.
sm.lastProgressTime = time.Now()
} else {
//bestState := sm.chain.BestSnapshot()
//log.Warnf("No sync peer candidates available. Chain tip at %s(%d)",
// bestState.Hash.String(), bestState.Height)
log.Warnf("No sync peer candidates available")
}
}
Expand Down Expand Up @@ -688,15 +746,16 @@ func (sm *SyncManager) current() bool {
// handleBlockMsg handles block messages from all peers.
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
peer := bmsg.peer
state, exists := sm.peerStates[peer]
_, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received block message from unknown peer %s", peer)
return
}

// If we didn't ask for this block then the peer is misbehaving.
blockHash := bmsg.block.Hash()
if _, exists = state.requestedBlocks[*blockHash]; !exists {
//if _, exists = state.requestedBlocks[*blockHash]; !exists {
if _, exists = sm.requestedBlocks[*blockHash]; !exists {
// The regression test intentionally sends some blocks twice
// to test duplicate block insertion fails. Don't disconnect
// the peer or ignore the block when we're in regression test
Expand All @@ -719,6 +778,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
// properly.
isCheckpointBlock := false
behaviorFlags := blockchain.BFNone
//var nextBlocks []*blockMsg
if sm.headersFirstMode {
firstNodeEl := sm.headerList.Front()
if firstNodeEl != nil {
Expand All @@ -728,23 +788,73 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) {
isCheckpointBlock = true
} else {
//log.Infof("0 got block %s which is next in line", firstNode.hash)
sm.headerList.Remove(firstNodeEl)

firstNodeEl := sm.headerList.Front()
firstNode := firstNodeEl.Value.(*headerNode)
nextBlock, found := sm.queuedBlocks[*firstNode.hash]
if found {
defer func() {
sm.handleBlockMsg(nextBlock)
}()
}

//for firstNodeEl != nil {
// firstNode := firstNodeEl.Value.(*headerNode)
// log.Infof("1 looking for %s which the next block in line", firstNode.hash.String())

// nextBlock, found := sm.queuedBlocks[*firstNode.hash]
// if found {
// log.Infof("2 found next block %s", firstNode.hash.String())
// //nextBlocks = append(nextBlocks, nextBlock)

// sm.headerList.Remove(firstNodeEl)
// firstNodeEl = sm.headerList.Front()
// delete(sm.queuedBlocks, *firstNode.hash)
// } else {
// log.Infof("3 next block %s not found", firstNode.hash.String())
// firstNodeEl = nil
// }
//}
}
} else {
// Since we download blocks in parallel, the blocks
// requested may not come in order. Hold onto these
// blocks for now for later processing.
sm.queuedBlocks[*bmsg.block.Hash()] = bmsg
////sm.queuedBlocks[bmsg.block.MsgBlock().Header.PrevBlock] = bmsg
////delete(sm.requestedBlocks, *blockHash)
//log.Infof("storing block %s "+
// "for later processing", bmsg.block.Hash().String())
////log.Infof("storing block %s with key %s "+
//// "for later processing", bmsg.block.Hash().String(),
//// bmsg.block.MsgBlock().Header.PrevBlock.String())
return
}
}
}

//defer func() {
// log.Infof("defer")
// for i := range nextBlocks {
// nextBlocks[i].reply = nil
// sm.handleBlockMsg(nextBlocks[i])
// }
//}()

//log.Infof("sending block %s to be processed", blockHash.String())
// Remove block from request maps. Either chain will know about it and
// so we shouldn't have any more instances of trying to fetch it, or we
// will fail the insert and thus we'll retry next time we get an inv.
delete(state.requestedBlocks, *blockHash)
//delete(state.requestedBlocks, *blockHash)
delete(sm.requestedBlocks, *blockHash)

// Process the block to include validation, best chain selection, orphan
// handling, etc.
_, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags)
if err != nil {
// When the error is a rule error, it means the block was simply
//log.Infof("processed %s", bmsg.block.Hash().String())
if err != nil { // When the error is a rule error, it means the block was simply
// rejected as opposed to something actually going wrong, so log
// it as such. Otherwise, something really did go wrong, so log
// it as an actual error.
Expand Down Expand Up @@ -855,7 +965,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
// getting short.
if !isCheckpointBlock {
if sm.startHeader != nil &&
len(state.requestedBlocks) < minInFlightBlocks {
len(sm.requestedBlocks) < minInFlightBlocks {
sm.fetchHeaderBlocks()
}
return
Expand Down Expand Up @@ -900,12 +1010,29 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
// fetchHeaderBlocks creates and sends a request to the syncPeer for the next
// list of blocks to be downloaded based on the current list of headers.
func (sm *SyncManager) fetchHeaderBlocks() {
//log.Infof("fetchHeaderBlocks called")
// Nothing to do if there is no start header.
if sm.startHeader == nil {
log.Warnf("fetchHeaderBlocks called with no start header")
return
}

node, ok := sm.startHeader.Value.(*headerNode)
if !ok {
log.Warn("Header list node type is not a headerNode")
return
}

if node.height-1024 > sm.chain.BestSnapshot().Height {
log.Infof("return as current best chain is %d "+
"but startheader is %d",
sm.chain.BestSnapshot().Height,
node.height)
return
}

queryMessages := make([]wire.Message, 0, sm.headerList.Len())

// Build up a getdata request for the list of blocks the headers
// describe. The size hint will be limited to wire.MaxInvPerMsg by
// the function, so no need to double check it here.
Expand All @@ -918,19 +1045,15 @@ func (sm *SyncManager) fetchHeaderBlocks() {
continue
}

iv := wire.NewInvVect(wire.InvTypeBlock, node.hash)
iv := wire.NewInvVect(wire.InvTypeWitnessBlock, node.hash)
haveInv, err := sm.haveInventory(iv)
if err != nil {
log.Warnf("Unexpected failure when checking for "+
"existing inventory during header block "+
"fetch: %v", err)
}
if !haveInv {
syncPeerState := sm.peerStates[sm.syncPeer]

sm.requestedBlocks[*node.hash] = struct{}{}
syncPeerState.requestedBlocks[*node.hash] = struct{}{}

// If we're fetching from a witness enabled peer
// post-fork, then ensure that we receive all the
// witness data in the blocks.
Expand All @@ -940,15 +1063,61 @@ func (sm *SyncManager) fetchHeaderBlocks() {

gdmsg.AddInvVect(iv)
numRequested++
queryMessages = append(queryMessages, gdmsg)
gdmsg = wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len()))
}

sm.startHeader = e.Next()
if numRequested >= wire.MaxInvPerMsg {

if numRequested > 1024 {
break
}

//// Only batch 16 blocks per work.
//if numRequested != 0 && numRequested%1 == 0 {
// queryMessages = append(queryMessages, gdmsg)
// gdmsg = wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len()))

// if numRequested >= wire.MaxInvPerMsg {
// //queryMessages = append(queryMessages, gdmsg)
// break
// }
//}
}
if len(gdmsg.InvList) > 0 {
sm.syncPeer.QueueMessage(gdmsg, nil)

blockChan := make(chan *wire.MsgBlock, numRequested)
if len(queryMessages) == 0 {
return
}
q := checkpointedBlocksQuery{
syncMgr: sm,
msgs: queryMessages,
msgBlockChan: blockChan,
}
log.Infof("query messages: %d", len(queryMessages))

// Not sure if this is even needed. Maybe it's ok to just ignore stuff
// from the error channel since any errors in the blocks are gonna be
// caught anyways.
go func() {
errChan := sm.fetchManager.Query(
q.requests(),
query.Cancel(sm.quit),
query.NoRetryMax(),
)

select {
//case msgBlock := <-blockChan:
case err := <-errChan:
if err != nil {
log.Infof("err: %v", err)
}

case <-sm.quit:
return
}

}()
}

// handleHeadersMsg handles block header messages from all peers. Headers are
Expand Down Expand Up @@ -1551,6 +1720,24 @@ func (sm *SyncManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done
}

sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}

//if !sm.headersFirstMode {
// sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
// return
//}

//// Ignore QueueBlock requests on headers first.
//done <- struct{}{}

////for block.MsgBlock().Header.PrevBlock != sm.chain.BestSnapshot().Hash {
//// done <- struct{}{}

//// select {
//// case <-sm.quit:
//// }
////}

////sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
}

// QueueInv adds the passed inv message and peer to the block handling queue.
Expand Down Expand Up @@ -1605,6 +1792,10 @@ func (sm *SyncManager) Start() {
return
}

if err := sm.fetchManager.Start(); err != nil {
log.Info(err)
}

log.Trace("Starting sync manager")
sm.wg.Add(1)
go sm.blockHandler()
Expand Down Expand Up @@ -1675,7 +1866,15 @@ func New(config *Config) (*SyncManager, error) {
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
feeEstimator: config.FeeEstimator,
queuedBlocks: make(map[chainhash.Hash]*blockMsg),
fetchManager: query.NewWorkManager(
&query.Config{
ConnectedPeers: config.ConnectedPeers,
NewWorker: query.NewWorker,
Ranking: query.NewPeerRanking(),
},
),
feeEstimator: config.FeeEstimator,
}

best := sm.chain.BestSnapshot()
Expand Down
Loading

0 comments on commit 1c1e323

Please sign in to comment.