-
Notifications
You must be signed in to change notification settings - Fork 6
/
utils.go
211 lines (177 loc) · 5.49 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Copyright (c) 2016 The btcsuite developers
// Copyright (c) 2017-2022 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package dcrdtest
import (
"context"
"reflect"
"time"
dcrdtypes "github.com/decred/dcrd/rpc/jsonrpc/types/v4"
"github.com/decred/dcrd/rpcclient/v8"
)
// JoinType is an enum representing a particular type of "node join". A node
// join is a synchronization tool used to wait until a subset of nodes have a
// consistent state with respect to an attribute.
type JoinType uint8
const (
// Blocks is a JoinType which waits until all nodes share the same
// block height.
Blocks JoinType = iota
// Mempools is a JoinType which blocks until all nodes have identical
// mempool.
Mempools
)
// JoinNodes is a synchronization tool used to block until all passed nodes are
// fully synced with respect to an attribute. This function will block for a
// period of time, finally returning once all nodes are synced according to the
// passed JoinType. This function be used to ensure all active test
// harnesses are at a consistent state before proceeding to an assertion or
// check within rpc tests.
func JoinNodes(ctx context.Context, nodes []*Harness, joinType JoinType) error {
switch joinType {
case Blocks:
return syncBlocks(ctx, nodes)
case Mempools:
return syncMempools(ctx, nodes)
}
return nil
}
// syncMempools blocks until all nodes have identical mempools.
func syncMempools(ctx context.Context, nodes []*Harness) error {
poolsMatch := false
for !poolsMatch {
retry:
firstPool, err := nodes[0].Node.GetRawMempool(ctx, dcrdtypes.GRMAll)
if err != nil {
return err
}
// If all nodes have an identical mempool with respect to the
// first node, then we're done. Otherwise, drop back to the top
// of the loop and retry after a short wait period.
for _, node := range nodes[1:] {
nodePool, err := node.Node.GetRawMempool(ctx, dcrdtypes.GRMAll)
if err != nil {
return err
}
if !reflect.DeepEqual(firstPool, nodePool) {
time.Sleep(time.Millisecond * 100)
goto retry
}
}
poolsMatch = true
}
return nil
}
// syncBlocks blocks until all nodes report the same block height.
func syncBlocks(ctx context.Context, nodes []*Harness) error {
blocksMatch := false
for !blocksMatch {
retry:
blockHeights := make(map[int64]struct{})
for _, node := range nodes {
blockHeight, err := node.Node.GetBlockCount(ctx)
if err != nil {
return err
}
blockHeights[blockHeight] = struct{}{}
if len(blockHeights) > 1 {
time.Sleep(time.Millisecond * 100)
goto retry
}
}
blocksMatch = true
}
return nil
}
// ConnectNode establishes a new peer-to-peer connection between the "from"
// harness and the "to" harness. The connection made is flagged as persistent,
// therefore in the case of disconnects, "from" will attempt to reestablish a
// connection to the "to" harness.
func ConnectNode(ctx context.Context, from *Harness, to *Harness) error {
log.Tracef("ConnectNode start")
defer log.Tracef("ConnectNode end")
peerInfo, err := from.Node.GetPeerInfo(ctx)
if err != nil {
return err
}
numPeers := len(peerInfo)
log.Tracef("ConnectNode numPeers: %v", numPeers)
targetAddr := to.P2PAddress()
if err := from.Node.AddNode(ctx, targetAddr, rpcclient.ANAdd); err != nil {
return err
}
log.Tracef("ConnectNode targetAddr: %v", targetAddr)
// Block until a new connection has been established.
peerInfo, err = from.Node.GetPeerInfo(ctx)
if err != nil {
return err
}
log.Tracef("ConnectNode peerInfo: %v", peerInfo)
for len(peerInfo) <= numPeers {
peerInfo, err = from.Node.GetPeerInfo(ctx)
if err != nil {
return err
}
}
log.Tracef("ConnectNode len(peerInfo): %v", len(peerInfo))
return nil
}
// RemoveNode removes the peer-to-peer connection between the "from" harness and
// the "to" harness. The connection is only removed in this direction, therefore
// if the reverse connection exists, the nodes may still be connected.
//
// This function returns an error if the nodes were not previously connected.
func RemoveNode(ctx context.Context, from *Harness, to *Harness) error {
targetAddr := to.P2PAddress()
if err := from.Node.AddNode(ctx, targetAddr, rpcclient.ANRemove); err != nil {
// AddNode(..., ANRemove) returns an error if the peer is not found
return err
}
// Block until this particular connection has been dropped.
for {
peerInfo, err := from.Node.GetPeerInfo(ctx)
if err != nil {
return err
}
for _, p := range peerInfo {
if p.Addr == targetAddr {
// Nodes still connected. Skip and re-fetch the list of nodes.
continue
}
}
// If this point is reached, then the nodes are not connected anymore.
break
}
return nil
}
// NodesConnected verifies whether there is a connection via the p2p interface
// between the specified nodes. If allowReverse is true, connectivity is also
// checked in the reverse direction (to->from).
func NodesConnected(ctx context.Context, from, to *Harness, allowReverse bool) (bool, error) {
peerInfo, err := from.Node.GetPeerInfo(ctx)
if err != nil {
return false, err
}
targetAddr := to.P2PAddress()
for _, p := range peerInfo {
if p.Addr == targetAddr {
return true, nil
}
}
if !allowReverse {
return false, nil
}
// Check in the reverse direction.
peerInfo, err = to.Node.GetPeerInfo(ctx)
if err != nil {
return false, err
}
targetAddr = from.P2PAddress()
for _, p := range peerInfo {
if p.Addr == targetAddr {
return true, nil
}
}
return false, nil
}