From 32d7f0cf203befa3cd939e54d4bf16195c26a5e2 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Tue, 6 Aug 2024 17:53:22 +0900 Subject: [PATCH] peer: make peer meet query.Peer interface query.Peer is used for downloading blocks out of order during headers first download. Methods SubscribeRecvMsg() and OnDisconnect() are added to abide by the interface. --- peer/peer.go | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/peer/peer.go b/peer/peer.go index 195fc0b4fe..6af323873d 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -494,6 +494,11 @@ type Peer struct { queueQuit chan struct{} outQuit chan struct{} quit chan struct{} + + // subscribers is a channel for relaying all messages that were received + // to this peer. + subscribers map[recvMsgsubscription]struct{} + subscriberLock sync.Mutex } // String returns the peer's address and directionality as a human-readable @@ -1098,6 +1103,35 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, return msg, buf, nil } +// recvMsgsubscription is two channels for a subscriber of recevied messages. +// msgChan for sending the messages and quit for cancelling the subscription. +type recvMsgsubscription struct { + msgChan chan wire.Message + quit chan struct{} +} + +// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin +// messages received from this peer will be sent on the returned +// channel. A closure is also returned, that should be called to cancel +// the subscription. +func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) { + p.subscriberLock.Lock() + defer p.subscriberLock.Unlock() + + // No need to buffer this channel as we'll spin up a new goroutine for + // every send. + msgChan := make(chan wire.Message) + quit := make(chan struct{}) + sub := recvMsgsubscription{ + msgChan, + quit, + } + + p.subscribers[sub] = struct{}{} + + return msgChan, func() { close(quit) } +} + // writeMessage sends a bitcoin message to the peer with logging. func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error { // Don't do anything if we're disconnecting. @@ -1446,6 +1480,27 @@ out: } break out } + + // Send the received message to all the subscribers. + for sub := range p.subscribers { + select { + case <-sub.quit: + delete(p.subscribers, sub) + continue + default: + } + + // Spin up a goroutine so that we don't block here. + go func(subscription chan wire.Message, + quit chan struct{}) { + + select { + case subscription <- rmsg: + case <-p.quit: + } + + }(sub.msgChan, p.quit) + } atomic.StoreInt64(&p.lastRecv, time.Now().Unix()) p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg} @@ -1961,6 +2016,12 @@ func (p *Peer) Disconnect() { close(p.quit) } +// OnDisconnect returns a channel that will be closed when this peer is +// disconnected. +func (p *Peer) OnDisconnect() <-chan struct{} { + return p.quit +} + // readRemoteVersionMsg waits for the next message to arrive from the remote // peer. If the next message is not a version message or the version is not // acceptable then return an error. @@ -2397,6 +2458,7 @@ func newPeerBase(origCfg *Config, inbound bool) *Peer { queueQuit: make(chan struct{}), outQuit: make(chan struct{}), quit: make(chan struct{}), + subscribers: make(map[recvMsgsubscription]struct{}), cfg: cfg, // Copy so caller can't mutate. services: cfg.Services, protocolVersion: cfg.ProtocolVersion,