Skip to content

Commit

Permalink
peer: make peer meet query.Peer interface
Browse files Browse the repository at this point in the history
query.Peer is used for downloading blocks out of order during headers
first download.  Methods SubscribeRecvMsg() and OnDisconnect() are added
to abide by the interface.
  • Loading branch information
kcalvinalvin committed Dec 10, 2024
1 parent ec0b90d commit 06e6b63
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,10 @@ type Peer struct {
queueQuit chan struct{}
outQuit chan struct{}
quit chan struct{}

// subscribers is a channel for relaying all messages that were received
// to this peer.
subscribers []chan wire.Message
}

// String returns the peer's address and directionality as a human-readable
Expand Down Expand Up @@ -1098,6 +1102,24 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte,
return msg, buf, nil
}

// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin
// messages received from this peer will be sent on the returned
// channel. A closure is also returned, that should be called to cancel
// the subscription.
func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) {
msgChan := make(chan wire.Message, 1)
p.subscribers = append(p.subscribers, msgChan)

// Cancellation is just removing the channel from the subscribers list.
idx := len(p.subscribers) - 1
cancel := func() {
p.subscribers = append(p.subscribers[:idx],
p.subscribers[idx+1:]...)
}

return msgChan, cancel
}

// writeMessage sends a bitcoin message to the peer with logging.
func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error {
// Don't do anything if we're disconnecting.
Expand Down Expand Up @@ -1402,6 +1424,10 @@ out:
// needed.
rmsg, buf, err := p.readMessage(p.wireEncoding)
idleTimer.Stop()
// Send the received message to all the subscribers.
for _, sub := range p.subscribers {
sub <- rmsg
}
if err != nil {
// In order to allow regression tests with malformed messages, don't
// disconnect the peer when we're in regression test mode and the
Expand Down Expand Up @@ -1446,6 +1472,7 @@ out:
}
break out
}

atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}

Expand Down Expand Up @@ -1961,6 +1988,12 @@ func (p *Peer) Disconnect() {
close(p.quit)
}

// OnDisconnect returns a channel that will be closed when this peer is
// disconnected.
func (p *Peer) OnDisconnect() <-chan struct{} {
return p.quit
}

// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
Expand Down

0 comments on commit 06e6b63

Please sign in to comment.