forked from anthdm/hbbft
-
Notifications
You must be signed in to change notification settings - Fork 0
/
local_transport.go
84 lines (73 loc) · 1.89 KB
/
local_transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package hbbft
import (
"fmt"
"sync"
)
// LocalTransport implements a local Transport. This is used to test hbbft
// without going over the network.
type LocalTransport struct {
lock sync.RWMutex
peers map[uint64]*LocalTransport
consumeCh chan RPC
addr uint64
}
// NewLocalTransport returns a new LocalTransport.
func NewLocalTransport(addr uint64) *LocalTransport {
return &LocalTransport{
peers: make(map[uint64]*LocalTransport),
consumeCh: make(chan RPC, 1024), // nodes * nodes should be fine here,
addr: addr,
}
}
// Consume implements the Transport interface.
func (t *LocalTransport) Consume() <-chan RPC {
return t.consumeCh
}
// SendProofMessages implements the Transport interface.
func (t *LocalTransport) SendProofMessages(id uint64, msgs []interface{}) error {
i := 0
for addr := range t.peers {
if err := t.makeRPC(id, addr, msgs[i]); err != nil {
return err
}
i++
}
return nil
}
// Broadcast implements the Transport interface.
func (t *LocalTransport) Broadcast(id uint64, msg interface{}) error {
for addr := range t.peers {
if err := t.makeRPC(id, addr, msg); err != nil {
return err
}
}
return nil
}
// SendMessage implements the transport interface.
func (t *LocalTransport) SendMessage(from, to uint64, msg interface{}) error {
return t.makeRPC(from, to, msg)
}
// Connect implements the Transport interface.
func (t *LocalTransport) Connect(addr uint64, tr Transport) {
trans := tr.(*LocalTransport)
t.lock.Lock()
defer t.lock.Unlock()
t.peers[addr] = trans
}
// Addr implements the Transport interface.
func (t *LocalTransport) Addr() uint64 {
return t.addr
}
func (t *LocalTransport) makeRPC(id, addr uint64, msg interface{}) error {
t.lock.RLock()
peer, ok := t.peers[addr]
t.lock.RUnlock()
if !ok {
return fmt.Errorf("failed to connect with %d", addr)
}
peer.consumeCh <- RPC{
NodeID: id,
Payload: msg,
}
return nil
}