Skip to content

Commit

Permalink
netsync: add checkpointedBlocksQuery
Browse files Browse the repository at this point in the history
checkpointedBlocksQuery is a helper to create []*query.Request which can
be passed off to query.Workmanager to query for wire.Messages to
multiple peers.  This is useful for downloading blocks out of order from
multiple peers during ibd.
  • Loading branch information
kcalvinalvin committed Dec 26, 2024
1 parent b1b1e95 commit 4b80f6b
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/jessevdk/go-flags v1.4.0
github.com/jrick/logrotate v1.0.0
github.com/lightninglabs/neutrino v0.16.0
github.com/stretchr/testify v1.8.4
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
golang.org/x/crypto v0.22.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/lightninglabs/neutrino v0.16.0 h1:YNTQG32fPR/Zg0vvJVI65OBH8l3U18LSXXtX91hx0q0=
github.com/lightninglabs/neutrino v0.16.0/go.mod h1:x3OmY2wsA18+Kc3TSV2QpSUewOCiscw2mKpXgZv2kZk=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
75 changes: 75 additions & 0 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/btcsuite/btcd/mempool"
peerpkg "github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/query"
)

const (
Expand Down Expand Up @@ -173,6 +174,80 @@ func limitAdd(m map[chainhash.Hash]struct{}, hash chainhash.Hash, limit int) {
m[hash] = struct{}{}
}

// checkpointedBlocksQuery is a helper to construct query.Requests for GetData
// messages.
type checkpointedBlocksQuery struct {
msg *wire.MsgGetData
sm *SyncManager
blocks map[chainhash.Hash]struct{}
}

// newCheckpointedBlocksQuery returns an initialized newCheckpointedBlocksQuery.
func newCheckpointedBlocksQuery(msg *wire.MsgGetData,
sm *SyncManager) checkpointedBlocksQuery {

m := make(map[chainhash.Hash]struct{}, len(msg.InvList))
for _, inv := range msg.InvList {
m[inv.Hash] = struct{}{}
}
return checkpointedBlocksQuery{msg, sm, m}
}

// handleResponse returns that the progress is progressed and finished if the
// received wire.Message is a MsgBlock.
func (c *checkpointedBlocksQuery) handleResponse(req, resp wire.Message,
peerAddr string) query.Progress {

block, ok := resp.(*wire.MsgBlock)
if !ok {
// We are only looking for block messages.
return query.Progress{
Finished: false,
Progressed: false,
}
}

// If we didn't find this block in the map of blocks we're expecting,
// we're neither finished nor progressed.
hash := block.BlockHash()
_, found := c.blocks[hash]
if !found {
log.Warnf("Got unrequested block %v from %s -- disconnecting",
hash, peerAddr)

return query.Progress{
Finished: false,
Progressed: false,
}
}
delete(c.blocks, hash)

// If we have blocks we're expecting, we've progressed but not finished.
if len(c.blocks) > 0 {
return query.Progress{
Finished: false,
Progressed: true,
}
}

// We have no more blocks we're expecting from heres so we're finished.
return query.Progress{
Finished: true,
Progressed: true,
}
}

// requests returns a slice of query.Request that can be queued to
// query.WorkManager.
func (c *checkpointedBlocksQuery) requests() []*query.Request {
req := &query.Request{
Req: c.msg,
HandleResp: c.handleResponse,
}

return []*query.Request{req}
}

// SyncManager is used to communicate block related messages with peers. The
// SyncManager is started as by executing Start() in a goroutine. Once started,
// it selects peers to sync from and starts the initial block download. Once the
Expand Down

0 comments on commit 4b80f6b

Please sign in to comment.