-
Notifications
You must be signed in to change notification settings - Fork 295
/
server.go
4515 lines (4028 loc) · 149 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2024 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"context"
"crypto/elliptic"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"math"
"net"
"net/netip"
"os"
"path"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/decred/dcrd/addrmgr/v3"
"github.com/decred/dcrd/blockchain/stake/v5"
"github.com/decred/dcrd/blockchain/standalone/v2"
"github.com/decred/dcrd/certgen"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/chaincfg/v3"
"github.com/decred/dcrd/connmgr/v3"
"github.com/decred/dcrd/container/apbf"
"github.com/decred/dcrd/container/lru"
"github.com/decred/dcrd/crypto/rand"
"github.com/decred/dcrd/database/v3"
"github.com/decred/dcrd/dcrutil/v4"
"github.com/decred/dcrd/internal/blockchain"
"github.com/decred/dcrd/internal/blockchain/indexers"
"github.com/decred/dcrd/internal/fees"
"github.com/decred/dcrd/internal/mempool"
"github.com/decred/dcrd/internal/mining"
"github.com/decred/dcrd/internal/mining/cpuminer"
"github.com/decred/dcrd/internal/netsync"
"github.com/decred/dcrd/internal/rpcserver"
"github.com/decred/dcrd/internal/version"
"github.com/decred/dcrd/math/uint256"
"github.com/decred/dcrd/mixing"
"github.com/decred/dcrd/mixing/mixpool"
"github.com/decred/dcrd/peer/v3"
"github.com/decred/dcrd/txscript/v4"
"github.com/decred/dcrd/wire"
"github.com/syndtr/goleveldb/leveldb"
)
const (
// defaultServices describes the default services that are supported by
// the server.
defaultServices = wire.SFNodeNetwork
// defaultRequiredServices describes the default services that are
// required to be supported by outbound peers.
defaultRequiredServices = wire.SFNodeNetwork
// defaultTargetOutbound is the default number of outbound peers to
// target.
defaultTargetOutbound = 8
// defaultMaximumVoteAge is the threshold of blocks before the tip
// that can be voted on.
defaultMaximumVoteAge = 1440
// connectionRetryInterval is the base amount of time to wait in between
// retries when connecting to persistent peers. It is adjusted by the
// number of retries such that there is a retry backoff.
connectionRetryInterval = time.Second * 5
// maxProtocolVersion is the max protocol version the server supports.
maxProtocolVersion = wire.BatchedCFiltersV2Version
// These fields are used to track known addresses on a per-peer basis.
//
// maxKnownAddrsPerPeer is the maximum number of items to track.
//
// knownAddrsFPRate is the false positive rate for the APBF used to track
// them. It is set to a rate of 1 per 1000 since addresses are not very
// large and they only need to be filtered once per connection, so an extra
// 10 of them being sent (on average) again even though they technically
// wouldn't need to is a good tradeoff.
//
// These values result in about 40 KiB memory usage including overhead.
maxKnownAddrsPerPeer = 10000
knownAddrsFPRate = 0.001
// maxCachedNaSubmissions is the maximum number of network address
// submissions cached.
maxCachedNaSubmissions = 20
// These constants control the maximum number of simultaneous pending
// getdata messages and the individual data item requests they make without
// being disconnected.
//
// Since each getdata message is comprised of several individual data item
// requests, the limiting is applied on both dimensions to offer more
// flexibility while still keeping memory usage bounded to reasonable
// limits.
//
// maxConcurrentGetDataReqs is the maximum number of simultaneous pending
// getdata message requests.
//
// maxPendingGetDataItemReqs is the maximum number of overall total
// simultaneous pending individual data item requests.
//
// In other words, when combined, a peer may mix and match simultaneous
// getdata requests for varying amounts of data items so long as it does not
// exceed the maximum specified number of simultaneous pending getdata
// messages or the maximum number of total overall pending individual data
// item requests.
maxConcurrentGetDataReqs = 1000
maxPendingGetDataItemReqs = 2 * wire.MaxInvPerMsg
// maxReorgDepthNotify specifies the maximum reorganization depth for which
// winning ticket notifications will be sent over RPC. The reorg depth is
// the number of blocks that would be reorganized out of the current best
// chain if a side chain being considered for notifications were to
// ultimately be extended to be longer than the current one.
//
// In effect, this helps to prevent large reorgs by refusing to send the
// winning ticket information to RPC clients, such as voting wallets, which
// depend on it to cast votes.
//
// This check also doubles to help reduce exhaustion attacks that could
// otherwise arise from sending old orphan blocks and forcing nodes to do
// expensive lottery data calculations for them.
maxReorgDepthNotify = 6
// These fields are used to track recently confirmed transactions.
//
// maxRecentlyConfirmedTxns specifies the maximum number to track and is set
// to target tracking the maximum number transactions of the minimum
// realistic size (~206 bytes) in approximately one hour of blocks on the
// main network.
//
// recentlyConfirmedTxnsFPRate is the false positive rate for the APBF used
// to track them and is set to a rate of 1 per 1 million which supports up
// to ~11.5 transactions/s before a single false positive would be seen on
// average and thus allows for plenty of future growth.
//
// These values result in about 183 KiB memory usage including overhead.
maxRecentlyConfirmedTxns = 23000
recentlyConfirmedTxnsFPRate = 0.000001
// These fields are used when caching recently advertised transactions.
//
// maxRecentlyAdvertisedTxns specifies the maximum number to cache and is
// set to target tracking the maximum number transactions of the minimum
// realistic size (~206 bytes) in approximately two blocks on the main
// network plus an additional 20%.
//
// recentlyAdvertisedTxnsTTL is the time to keep recently advertised
// transactions in the cache before they are expired.
//
// These values result in about 640 KiB memory usage including overhead.
maxRecentlyAdvertisedTxns = 4500
recentlyAdvertisedTxnsTTL = 45 * time.Second
)
var (
// userAgentName is the user agent name and is used to help identify
// ourselves to other Decred peers.
userAgentName = "dcrd"
// userAgentVersion is the user agent version and is used to help
// identify ourselves to other peers.
userAgentVersion = fmt.Sprintf("%d.%d.%d", version.Major, version.Minor,
version.Patch)
)
// simpleAddr implements the net.Addr interface with two struct fields.
type simpleAddr struct {
net, addr string
}
// String returns the address.
//
// This is part of the net.Addr interface.
func (a simpleAddr) String() string {
return a.addr
}
// Network returns the network.
//
// This is part of the net.Addr interface.
func (a simpleAddr) Network() string {
return a.net
}
// Ensure simpleAddr implements the net.Addr interface.
var _ net.Addr = simpleAddr{}
// broadcastMsg provides the ability to house a Decred message to be broadcast
// to all connected peers except specified excluded peers.
type broadcastMsg struct {
message wire.Message
excludePeers []*serverPeer
}
// broadcastInventoryAdd is a type used to declare that the InvVect it contains
// needs to be added to the rebroadcast map.
type broadcastInventoryAdd relayMsg
// broadcastInventoryDel is a type used to declare that the InvVect it contains
// needs to be removed from the rebroadcast map.
type broadcastInventoryDel *wire.InvVect
// broadcastPruneInventory is a type used to declare that rebroadcast
// inventory entries need to be filtered and removed where necessary.
type broadcastPruneInventory struct{}
// relayMsg packages an inventory vector along with the newly discovered
// inventory and a flag that determines if the relay should happen immediately
// (it will be put into a trickle queue if false) so the relay has access to
// that information.
type relayMsg struct {
invVect *wire.InvVect
data interface{}
immediate bool
reqServices wire.ServiceFlag
}
// naSubmission represents a network address submission from an outbound peer.
type naSubmission struct {
na *wire.NetAddress
netType addrmgr.NetAddressType
reach addrmgr.NetAddressReach
score uint32
lastAccessed int64
}
// naSubmissionCache represents a bounded map for network address submisions.
type naSubmissionCache struct {
cache map[string]*naSubmission
limit int
mtx sync.Mutex
}
// add caches the provided address submission.
func (sc *naSubmissionCache) add(sub *naSubmission) error {
if sub == nil {
return fmt.Errorf("submission cannot be nil")
}
key := sub.na.IP.String()
if key == "" {
return fmt.Errorf("submission key cannot be an empty string")
}
sc.mtx.Lock()
defer sc.mtx.Unlock()
// Remove the oldest submission if cache limit has been reached.
if len(sc.cache) == sc.limit {
var oldestSub *naSubmission
for _, sub := range sc.cache {
if oldestSub == nil {
oldestSub = sub
continue
}
if sub.lastAccessed < oldestSub.lastAccessed {
oldestSub = sub
}
}
if oldestSub != nil {
delete(sc.cache, oldestSub.na.IP.String())
}
}
sub.score = 1
sub.lastAccessed = time.Now().Unix()
sc.cache[key] = sub
return nil
}
// exists returns true if the provided key exist in the submissions cache.
func (sc *naSubmissionCache) exists(key string) bool {
if key == "" {
return false
}
sc.mtx.Lock()
_, ok := sc.cache[key]
sc.mtx.Unlock()
return ok
}
// incrementScore increases the score of address submission referenced by
// the provided key by one.
func (sc *naSubmissionCache) incrementScore(key string) error {
if key == "" {
return fmt.Errorf("submission key cannot be an empty string")
}
sc.mtx.Lock()
defer sc.mtx.Unlock()
sub, ok := sc.cache[key]
if !ok {
return fmt.Errorf("submission key not found: %s", key)
}
sub.score++
sub.lastAccessed = time.Now().Unix()
sc.cache[key] = sub
return nil
}
// bestSubmission fetches the best scoring submission of the provided
// network interface.
func (sc *naSubmissionCache) bestSubmission(net addrmgr.NetAddressType) *naSubmission {
sc.mtx.Lock()
defer sc.mtx.Unlock()
var best *naSubmission
for _, sub := range sc.cache {
if sub.netType != net {
continue
}
if best == nil {
best = sub
continue
}
if sub.score > best.score {
best = sub
}
}
return best
}
// peerState houses state of inbound, persistent, and outbound peers as well
// as banned peers and outbound groups.
type peerState struct {
sync.Mutex
// The following fields are protected by the embedded mutex.
inboundPeers map[int32]*serverPeer
outboundPeers map[int32]*serverPeer
persistentPeers map[int32]*serverPeer
banned map[string]time.Time
outboundGroups map[string]int
// subCache houses the network address submission cache and is protected
// by its own mutex.
subCache *naSubmissionCache
}
// makePeerState returns a peer state instance that is used to maintain the
// state of inbound, persistent, and outbound peers as well as banned peers and
// outbound groups.
func makePeerState() peerState {
return peerState{
inboundPeers: make(map[int32]*serverPeer),
persistentPeers: make(map[int32]*serverPeer),
outboundPeers: make(map[int32]*serverPeer),
banned: make(map[string]time.Time),
outboundGroups: make(map[string]int),
subCache: &naSubmissionCache{
cache: make(map[string]*naSubmission, maxCachedNaSubmissions),
limit: maxCachedNaSubmissions,
},
}
}
// count returns the count of all known peers.
//
// This function MUST be called with the embedded mutex locked (for reads).
func (ps *peerState) count() int {
return len(ps.inboundPeers) + len(ps.outboundPeers) +
len(ps.persistentPeers)
}
// forAllOutboundPeers is a helper function that runs closure on all outbound
// peers known to peerState.
//
// This function MUST be called with the embedded mutex locked (for reads).
func (ps *peerState) forAllOutboundPeers(closure func(sp *serverPeer)) {
for _, e := range ps.outboundPeers {
closure(e)
}
for _, e := range ps.persistentPeers {
closure(e)
}
}
// forAllPeers is a helper function that runs closure on all peers known to
// peerState.
//
// This function MUST be called with the embedded mutex locked (for reads).
func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) {
for _, e := range ps.inboundPeers {
closure(e)
}
ps.forAllOutboundPeers(closure)
}
// ForAllPeers is a helper function that runs closure on all peers known to
// peerState.
//
// This function is safe for concurrent access.
func (ps *peerState) ForAllPeers(closure func(sp *serverPeer)) {
ps.Lock()
ps.forAllPeers(closure)
ps.Unlock()
}
// connectionsWithIP returns the number of connections with the given IP.
//
// This function MUST be called with the embedded mutex locked (for reads).
func (ps *peerState) connectionsWithIP(ip net.IP) int {
var total int
ps.forAllPeers(func(sp *serverPeer) {
if ip.Equal(sp.NA().IP) {
total++
}
})
return total
}
type resolveIPFn func(string) ([]net.IP, error)
// hostToNetAddress parses and returns an address manager network address given
// a hostname in a supported format (IPv4, IPv6). If the hostname cannot be
// immediately converted from a known address format, it will be resolved using
// the provided DNS resolver function. If it cannot be resolved, an error is
// returned.
//
// This function is safe for concurrent access.
func hostToNetAddress(host string, port uint16, services wire.ServiceFlag, resolver resolveIPFn) (*addrmgr.NetAddress, error) {
addrType, addrBytes := addrmgr.EncodeHost(host)
if addrType != addrmgr.UnknownAddressType {
// Since the host type has been successfully recognized and encoded,
// there is no need to perform a DNS lookup.
now := time.Unix(time.Now().Unix(), 0)
return addrmgr.NewNetAddressFromParams(addrType, addrBytes, port, now, services)
}
// Cannot determine the host address type. Must use DNS.
ips, err := resolver(host)
if err != nil {
return nil, err
}
if len(ips) == 0 {
return nil, fmt.Errorf("no addresses found for %s", host)
}
na := addrmgr.NewNetAddressFromIPPort(ips[0], port, services)
return na, nil
}
// ResolveLocalAddress picks the best suggested network address from available
// options, per the network interface key provided. The best suggestion, if
// found, is added as a local address.
//
// This function is safe for concurrent access.
func (ps *peerState) ResolveLocalAddress(netType addrmgr.NetAddressType, addrMgr *addrmgr.AddrManager, services wire.ServiceFlag) {
best := ps.subCache.bestSubmission(netType)
if best == nil {
return
}
targetOutbound := defaultTargetOutbound
if cfg.MaxPeers < targetOutbound {
targetOutbound = cfg.MaxPeers
}
// A valid best address suggestion must have a majority
// (60 percent majority) of outbound peers concluding on
// the same result.
if best.score < uint32(math.Ceil(float64(targetOutbound)*0.6)) {
return
}
addLocalAddress := func(bestSuggestion string, port uint16, services wire.ServiceFlag) {
na, err := hostToNetAddress(bestSuggestion, port, services, dcrdLookup)
if err != nil {
amgrLog.Errorf("unable to generate network address using host %v: "+
"%v", bestSuggestion, err)
return
}
if !addrMgr.HasLocalAddress(na) {
err := addrMgr.AddLocalAddress(na, addrmgr.ManualPrio)
if err != nil {
amgrLog.Errorf("unable to add local address: %v", err)
return
}
}
}
stripIPv6Zone := func(ip string) string {
// Strip IPv6 zone id if present.
zoneIndex := strings.LastIndex(ip, "%")
if zoneIndex > 0 {
return ip[:zoneIndex]
}
return ip
}
for _, listener := range cfg.Listeners {
host, portStr, err := net.SplitHostPort(listener)
if err != nil {
amgrLog.Errorf("unable to split network address: %v", err)
return
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
amgrLog.Errorf("unable to parse port: %v", err)
return
}
host = stripIPv6Zone(host)
// Add a local address if the best suggestion is referenced by a
// listener.
if best.na.IP.String() == host {
addLocalAddress(best.na.IP.String(), uint16(port), services)
continue
}
// Add a local address if the listener is generic (applies
// for both IPv4 and IPv6).
if host == "" || (host == "*" && runtime.GOOS == "plan9") {
addLocalAddress(best.na.IP.String(), uint16(port), services)
continue
}
listenerIP := net.ParseIP(host)
if listenerIP == nil {
amgrLog.Errorf("unable to parse listener: %v", host)
return
}
// Add a local address if the network address is a probable external
// endpoint of the listener.
lNet := addrmgr.IPv4Address
if listenerIP.To4() == nil {
lNet = addrmgr.IPv6Address
}
validExternal := (lNet == addrmgr.IPv4Address &&
best.reach == addrmgr.Ipv4) || lNet == addrmgr.IPv6Address &&
(best.reach == addrmgr.Ipv6Weak || best.reach == addrmgr.Ipv6Strong ||
best.reach == addrmgr.Teredo)
if validExternal {
addLocalAddress(best.na.IP.String(), uint16(port), services)
continue
}
}
}
// server provides a Decred server for handling communications to and from
// Decred peers.
type server struct {
bytesReceived atomic.Uint64 // Total bytes received from all peers since start.
bytesSent atomic.Uint64 // Total bytes sent by all peers since start.
shutdown atomic.Bool
// targetOutbound is the calculated number of target outbound peers to
// maintain. It is set at creation time and never modified afterwards, so
// it does not need to be protected for concurrent access.
targetOutbound uint32
// minKnownWork houses the minimum known work from the associated network
// params converted to a uint256 so the conversion only needs to be
// performed once when the server is initialized. Ideally, the chain params
// should be updated to use the new type, but that will be a major version
// bump, so a one-time conversion is a good tradeoff in the mean time.
minKnownWork uint256.Uint256
chainParams *chaincfg.Params
addrManager *addrmgr.AddrManager
connManager *connmgr.ConnManager
sigCache *txscript.SigCache
subsidyCache *standalone.SubsidyCache
rpcServer *rpcserver.Server
syncManager *netsync.SyncManager
bg *mining.BgBlkTmplGenerator
chain *blockchain.BlockChain
txMemPool *mempool.TxPool
feeEstimator *fees.Estimator
cpuMiner *cpuminer.CPUMiner
mixMsgPool *mixpool.Pool
modifyRebroadcastInv chan interface{}
peerState peerState
relayInv chan relayMsg
broadcast chan broadcastMsg
nat *upnpNAT
db database.DB
timeSource blockchain.MedianTimeSource
services wire.ServiceFlag
quit chan struct{}
// The following fields are used for optional indexes. They will be nil
// if the associated index is not enabled. These fields are set during
// initial creation of the server and never changed afterwards, so they
// do not need to be protected for concurrent access.
indexSubscriber *indexers.IndexSubscriber
txIndex *indexers.TxIndex
existsAddrIndex *indexers.ExistsAddrIndex
// These following fields are used to filter duplicate block lottery data
// anouncements.
lotteryDataBroadcastMtx sync.Mutex
lotteryDataBroadcast map[chainhash.Hash]struct{}
// recentlyConfirmedTxns tracks transactions that have been confirmed in the
// most recent blocks.
recentlyConfirmedTxns *apbf.Filter
// recentlyAdvertisedTxns caches transactions that have recently been
// advertised to other peers. The cache handles automatic expiration and
// maximum entry limiting.
//
// It is considered misbehavior to advertise a transaction and then claim it
// is not found when the corresponding request arrives. Further, since the
// mempool only contains the unconfirmed transactions as of the current best
// chain tip, a transaction might be advertised when it is first added to
// the mempool and then removed from the mempool prior to it being requested
// in the case new blocks are connected in between the advertisement and
// request.
//
// Thus, maintaining a separate cache of advertised transactions increases
// the probability they are available to serve regardless of whether or not
// they are still in the mempool when a request for the advertisement
// arrives.
//
// Note that it might be tempting to keep track of the number of times a tx
// has been advertised and requested so it can be removed from the cache as
// soon as there are no longer any potential outstanding requests, however,
// that is intentionally not done because it is exceedingly rare for
// advertisements to result in a request from all peers, so the extra
// overhead is not warranted.
recentlyAdvertisedTxns *lru.Map[chainhash.Hash, *dcrutil.Tx]
// The following fields are used to periodically log the total number
// evicted recently advertised transactions. They are only accessed from
// a single long-running goroutine, so they are not protected for concurrent
// access.
//
// totalAdvertisedTxnsEvicted is the total number of advertised transactions
// that have been evicted from the cache since the previous report.
//
// lastAdvertisedTxnsEvictedLogged is the last time the total number of
// advertised transactions that have been evicted from the cache was
// reported.
totalAdvertisedTxnsEvicted uint64
lastAdvertisedTxnsEvictedLogged time.Time
}
// serverPeer extends the peer to maintain state shared by the server.
type serverPeer struct {
*peer.Peer
// These fields are set at creation time and never modified afterwards, so
// they do not need to be protected for concurrent access.
server *server
persistent bool
isWhitelisted bool
quit chan struct{}
// syncMgrPeer houses the network sync manager peer instance that wraps the
// underlying peer similar to the way this server peer itself wraps it.
syncMgrPeer *netsync.Peer
// All fields below this point are either not set at creation time or are
// otherwise modified during operation and thus need to consider whether or
// not they need to be protected for concurrent access.
connReq atomic.Pointer[connmgr.ConnReq]
continueHash atomic.Pointer[chainhash.Hash]
disableRelayTx atomic.Bool
knownAddresses *apbf.Filter
banScore connmgr.DynamicBanScore
// addrsSent, getMiningStateSent and initState track whether or not the peer
// has already sent the respective request. They are used to prevent more
// than one response of each respective request per connection.
//
// They are only accessed directly in callbacks which all run in the same
// peer input handler goroutine and thus do not need to be protected for
// concurrent access.
addrsSent bool
getMiningStateSent bool
initStateSent bool
// The following fields are used to synchronize the net sync manager and
// server.
txProcessed chan struct{}
blockProcessed chan struct{}
mixMsgProcessed chan error
// peerNa is network address of the peer connected to.
peerNa atomic.Pointer[wire.NetAddress]
// announcedBlock tracks the most recent block announced to this peer and is
// used to filter duplicates.
//
// It is only accessed in the goroutine that handles relaying inventory and
// thus does not need to be protected for concurrent access.
announcedBlock *chainhash.Hash
// The following fields are used to serve getdata requests asynchronously as
// opposed to directly in the peer input handler.
//
// getDataQueue is a buffered channel for queueing up concurrent getdata
// requests.
//
// numPendingGetDataItemReqs tracks the total number of pending individual
// data item requests that still need to be served.
getDataQueue chan []*wire.InvVect
numPendingGetDataItemReqs atomic.Uint32
}
// newServerPeer returns a new serverPeer instance. The peer needs to be set by
// the caller.
func newServerPeer(s *server, isPersistent bool) *serverPeer {
return &serverPeer{
server: s,
persistent: isPersistent,
knownAddresses: apbf.NewFilter(maxKnownAddrsPerPeer, knownAddrsFPRate),
quit: make(chan struct{}),
txProcessed: make(chan struct{}, 1),
blockProcessed: make(chan struct{}, 1),
mixMsgProcessed: make(chan error, 1),
getDataQueue: make(chan []*wire.InvVect, maxConcurrentGetDataReqs),
}
}
// handleServeGetData is the primary logic for servicing queued getdata
// requests.
//
// It makes use of the given send done channel and semaphore to provide
// a little pipelining of database loads while keeping the memory usage bounded
// to reasonable limits.
//
// It is invoked from the serveGetData goroutine.
func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect,
sendDoneChan chan struct{}, semaphore chan struct{}) {
var notFoundMsg *wire.MsgNotFound
for _, iv := range invVects {
var sendInv bool
var dataMsg wire.Message
switch iv.Type {
case wire.InvTypeTx:
// Attempt to fetch the requested transaction. Try the pool of
// recently advertised transactions first and then fall back to the
// mempool.
//
// Note that this does not allow peers to request transactions
// already in a block over p2p unless they still happen to be in the
// pool of advertised transactions, as that would require all nodes
// to maintain a full transaction index which can be expensive.
// That ability is restricted to authenticated RPC only and requires
// the aforementioned full transaction index.
txHash := &iv.Hash
tx, ok := sp.server.recentlyAdvertisedTxns.Get(*txHash)
if !ok {
// Note that a call could be made to check for existence first,
// but simply trying to fetch a missing transaction results in
// the same behavior.
var err error
tx, err = sp.server.txMemPool.FetchTransaction(txHash)
if err != nil {
peerLog.Debugf("Unable to fetch tx %v from transaction "+
"pool for peer %s: %v", txHash, sp, err)
break
}
}
dataMsg = tx.MsgTx()
case wire.InvTypeBlock:
blockHash := &iv.Hash
block, err := sp.server.chain.BlockByHash(blockHash)
if err != nil {
peerLog.Debugf("Unable to fetch block hash %v for peer %s: %v",
blockHash, sp, err)
break
}
dataMsg = block.MsgBlock()
// When the peer requests the final block that was advertised in
// response to a getblocks message which requested more blocks than
// would fit into a single message, it requires a new inventory
// message to trigger it to issue another getblocks message for the
// next batch of inventory.
//
// However, that inventory message should not be sent until after
// the block itself is sent, so keep a flag for later use.
//
// Note that this is to support the legacy syncing model that is no
// longer used in dcrd which is now based on a much more robust
// headers-based syncing model. Nevertheless, this behavior is
// still a required part of the getblocks protocol semantics. It
// can be removed if a future protocol upgrade also removes the
// getblocks message.
continueHash := sp.continueHash.Load()
sendInv = continueHash != nil && *continueHash == *blockHash
case wire.InvTypeMix:
mixHash := &iv.Hash
msg, ok := sp.server.mixMsgPool.RecentMessage(mixHash)
if !ok {
peerLog.Debugf("Unable to fetch mix message %v from the mix "+
"pool for peer %s", mixHash, sp)
break
}
dataMsg = msg
default:
peerLog.Warnf("Unknown type '%d' in inventory request from %s",
iv.Type, sp)
continue
}
if dataMsg == nil {
// Keep track of all items that were not found in order to send a
// consolidated messsage once the entire batch is processed.
//
// The error when adding the inventory vector is ignored because the
// only way it could fail would be by exceeding the max allowed
// number of items which is impossible given the getdata message is
// enforced to not exceed that same maximum limit.
if notFoundMsg == nil {
notFoundMsg = wire.NewMsgNotFound()
}
notFoundMsg.AddInvVect(iv)
// There is no need to wait for the semaphore below when there is
// not any data to send.
sp.numPendingGetDataItemReqs.Add(^uint32(0))
continue
}
// Limit the number of items that can be queued to prevent wasting a
// bunch of memory by queuing far more data than can be sent in a
// reasonable time. The waiting occurs after the database fetch for the
// next one to provide a little pipelining.
//
// This also monitors the channel that is notified when queued messages
// are sent in order to release the semaphore without needing a separate
// monitoring goroutine.
for semAcquired := false; !semAcquired; {
select {
case <-sp.quit:
return
case semaphore <- struct{}{}:
semAcquired = true
case <-sendDoneChan:
// Release semaphore.
<-semaphore
}
}
// Decrement the pending data item requests accordingly and queue the
// data to be sent to the peer.
sp.numPendingGetDataItemReqs.Add(^uint32(0))
sp.QueueMessage(dataMsg, sendDoneChan)
// Send a new inventory message to trigger the peer to issue another
// getblocks message for the next batch of inventory if needed.
if sendInv {
best := sp.server.chain.BestSnapshot()
invMsg := wire.NewMsgInvSizeHint(1)
iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash)
invMsg.AddInvVect(iv)
sp.QueueMessage(invMsg, nil)
sp.continueHash.Store(nil)
}
}
if notFoundMsg != nil {
sp.QueueMessage(notFoundMsg, nil)
}
}
// serveGetData provides an asynchronous queue that services all data requested
// via getdata requests such that the peer may mix and match simultaneous
// getdata requests for varying amounts of data items so long as it does not
// exceed the maximum number of simultaneous pending getdata messages or the
// maximum number of total overall pending data item requests.
//
// It must be run in a goroutine.
func (sp *serverPeer) serveGetData() {
// Allow a max number of items to be loaded from the database/mempool and
// queued for send.
const maxPendingSend = 3
sendDoneChan := make(chan struct{}, maxPendingSend+1)
semaphore := make(chan struct{}, maxPendingSend)
for {
select {
case <-sp.quit:
return
case invVects := <-sp.getDataQueue:
sp.handleServeGetData(invVects, sendDoneChan, semaphore)
// Release the semaphore as queued messages are sent.
case <-sendDoneChan:
<-semaphore
}
}
}
// Run starts additional async processing for the peer and blocks until the peer
// disconnects at which point it notifies the server and net sync manager that
// the peer has disconnected and performs other associated cleanup such as
// evicting any remaining orphans sent by the peer and shutting down all
// goroutines.
func (sp *serverPeer) Run() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
sp.serveGetData()
wg.Done()
}()
// Wait for the peer to disconnect and notify the net sync manager and
// server accordingly.
sp.WaitForDisconnect()
srvr := sp.server
srvr.DonePeer(sp)
srvr.syncManager.PeerDisconnected(sp.syncMgrPeer)
if sp.VersionKnown() {
// Evict any remaining orphans that were sent by the peer.
numEvicted := srvr.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID()))
if numEvicted > 0 {
srvrLog.Debugf("Evicted %d %s from peer %v (id %d)", numEvicted,
pickNoun(numEvicted, "orphan", "orphans"), sp, sp.ID())
}
}
// Shutdown remaining peer goroutines.
close(sp.quit)
wg.Wait()
}
// newestBlock returns the current best block hash and height using the format
// required by the configuration for the peer package.
func (sp *serverPeer) newestBlock() (*chainhash.Hash, int64, error) {
best := sp.server.chain.BestSnapshot()
return &best.Hash, best.Height, nil
}
// addKnownAddress adds the given address to the set of known addresses to
// the peer to prevent sending duplicate addresses.
func (sp *serverPeer) addKnownAddress(na *addrmgr.NetAddress) {
sp.knownAddresses.Add([]byte(na.Key()))
}
// addKnownAddresses adds the given addresses to the set of known addresses to
// the peer to prevent sending duplicate addresses.
func (sp *serverPeer) addKnownAddresses(addresses []*addrmgr.NetAddress) {
for _, na := range addresses {
sp.addKnownAddress(na)
}
}
// addressKnown true if the given address is already known to the peer.
func (sp *serverPeer) addressKnown(na *addrmgr.NetAddress) bool {
return sp.knownAddresses.Contains([]byte(na.Key()))
}
// wireToAddrmgrNetAddress converts a wire NetAddress to an address manager
// NetAddress.
func wireToAddrmgrNetAddress(netAddr *wire.NetAddress) *addrmgr.NetAddress {
newNetAddr := addrmgr.NewNetAddressFromIPPort(netAddr.IP, netAddr.Port,
netAddr.Services)
newNetAddr.Timestamp = netAddr.Timestamp
return newNetAddr
}
// wireToAddrmgrNetAddresses converts a collection of wire net addresses to a
// collection of address manager net addresses.
func wireToAddrmgrNetAddresses(netAddr []*wire.NetAddress) []*addrmgr.NetAddress {
addrs := make([]*addrmgr.NetAddress, len(netAddr))
for i, wireAddr := range netAddr {
addrs[i] = wireToAddrmgrNetAddress(wireAddr)
}
return addrs
}