diff --git a/go.mod b/go.mod index 1f445d9065..3bd36f8652 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/jessevdk/go-flags v1.4.0 github.com/jrick/logrotate v1.0.0 + github.com/lightninglabs/neutrino v0.16.0 github.com/stretchr/testify v1.8.4 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 golang.org/x/crypto v0.22.0 diff --git a/go.sum b/go.sum index bb666c89de..3f3b346498 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,8 @@ github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= +github.com/lightninglabs/neutrino v0.16.0 h1:YNTQG32fPR/Zg0vvJVI65OBH8l3U18LSXXtX91hx0q0= +github.com/lightninglabs/neutrino v0.16.0/go.mod h1:x3OmY2wsA18+Kc3TSV2QpSUewOCiscw2mKpXgZv2kZk= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/netsync/manager.go b/netsync/manager.go index 3215a86ace..f437da1c4e 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -20,6 +20,7 @@ import ( "github.com/btcsuite/btcd/mempool" peerpkg "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/neutrino/query" ) const ( @@ -173,6 +174,80 @@ func limitAdd(m map[chainhash.Hash]struct{}, hash chainhash.Hash, limit int) { m[hash] = struct{}{} } +// checkpointedBlocksQuery is a helper to construct query.Requests for GetData +// messages. +type checkpointedBlocksQuery struct { + msg *wire.MsgGetData + sm *SyncManager + blocks map[chainhash.Hash]struct{} +} + +// newCheckpointedBlocksQuery returns an initialized newCheckpointedBlocksQuery. +func newCheckpointedBlocksQuery(msg *wire.MsgGetData, + sm *SyncManager) checkpointedBlocksQuery { + + m := make(map[chainhash.Hash]struct{}, len(msg.InvList)) + for _, inv := range msg.InvList { + m[inv.Hash] = struct{}{} + } + return checkpointedBlocksQuery{msg, sm, m} +} + +// handleResponse returns that the progress is progressed and finished if the +// received wire.Message is a MsgBlock. +func (c *checkpointedBlocksQuery) handleResponse(req, resp wire.Message, + peerAddr string) query.Progress { + + block, ok := resp.(*wire.MsgBlock) + if !ok { + // We are only looking for block messages. + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // If we didn't find this block in the map of blocks we're expecting, + // we're neither finished nor progressed. + hash := block.BlockHash() + _, found := c.blocks[hash] + if !found { + log.Warnf("Got unrequested block %v from %s -- disconnecting", + hash, peerAddr) + + return query.Progress{ + Finished: false, + Progressed: false, + } + } + delete(c.blocks, hash) + + // If we have blocks we're expecting, we've progressed but not finished. + if len(c.blocks) > 0 { + return query.Progress{ + Finished: false, + Progressed: true, + } + } + + // We have no more blocks we're expecting from heres so we're finished. + return query.Progress{ + Finished: true, + Progressed: true, + } +} + +// requests returns a slice of query.Request that can be queued to +// query.WorkManager. +func (c *checkpointedBlocksQuery) requests() []*query.Request { + req := &query.Request{ + Req: c.msg, + HandleResp: c.handleResponse, + } + + return []*query.Request{req} +} + // SyncManager is used to communicate block related messages with peers. The // SyncManager is started as by executing Start() in a goroutine. Once started, // it selects peers to sync from and starts the initial block download. Once the