forked from libp2p/go-libp2p
-
Notifications
You must be signed in to change notification settings - Fork 0
/
echo.go
164 lines (128 loc) · 4.26 KB
/
echo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package main
import (
"fmt"
"io"
"log"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
pb "github.com/libp2p/go-libp2p/examples/multipro/pb"
)
// pattern: /protocol-name/request-or-response-message/version
const echoRequest = "/echo/echoreq/0.0.1"
const echoResponse = "/echo/echoresp/0.0.1"
type EchoProtocol struct {
node *Node // local host
requests map[string]*pb.EchoRequest // used to access request data from response handlers
done chan bool // only for demo purposes to hold main from terminating
}
func NewEchoProtocol(node *Node, done chan bool) *EchoProtocol {
e := EchoProtocol{node: node, requests: make(map[string]*pb.EchoRequest), done: done}
node.SetStreamHandler(echoRequest, e.onEchoRequest)
node.SetStreamHandler(echoResponse, e.onEchoResponse)
// design note: to implement fire-and-forget style messages you may just skip specifying a response callback.
// a fire-and-forget message will just include a request and not specify a response object
return &e
}
// remote peer requests handler
func (e *EchoProtocol) onEchoRequest(s network.Stream) {
// get request data
data := &pb.EchoRequest{}
buf, err := io.ReadAll(s)
if err != nil {
s.Reset()
log.Println(err)
return
}
s.Close()
// unmarshal it
err = proto.Unmarshal(buf, data)
if err != nil {
log.Println(err)
return
}
log.Printf("%s: Received echo request from %s. Message: %s", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.Message)
valid := e.node.authenticateMessage(data, data.MessageData)
if !valid {
log.Println("Failed to authenticate message")
return
}
log.Printf("%s: Sending echo response to %s. Message id: %s...", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id)
// send response to the request using the message string he provided
resp := &pb.EchoResponse{
MessageData: e.node.NewMessageData(data.MessageData.Id, false),
Message: data.Message}
// sign the data
signature, err := e.node.signProtoMessage(resp)
if err != nil {
log.Println("failed to sign response")
return
}
// add the signature to the message
resp.MessageData.Sign = signature
ok := e.node.sendProtoMessage(s.Conn().RemotePeer(), echoResponse, resp)
if ok {
log.Printf("%s: Echo response to %s sent.", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())
}
e.done <- true
}
// remote echo response handler
func (e *EchoProtocol) onEchoResponse(s network.Stream) {
data := &pb.EchoResponse{}
buf, err := io.ReadAll(s)
if err != nil {
s.Reset()
log.Println(err)
return
}
s.Close()
// unmarshal it
err = proto.Unmarshal(buf, data)
if err != nil {
log.Println(err)
return
}
// authenticate message content
valid := e.node.authenticateMessage(data, data.MessageData)
if !valid {
log.Println("Failed to authenticate message")
return
}
// locate request data and remove it if found
req, ok := e.requests[data.MessageData.Id]
if ok {
// remove request from map as we have processed it here
delete(e.requests, data.MessageData.Id)
} else {
log.Println("Failed to locate request data boject for response")
return
}
if req.Message != data.Message {
log.Fatalln("Expected echo to respond with request message")
}
log.Printf("%s: Received echo response from %s. Message id:%s. Message: %s.", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id, data.Message)
e.done <- true
}
func (e *EchoProtocol) Echo(host host.Host) bool {
log.Printf("%s: Sending echo to: %s....", e.node.ID(), host.ID())
// create message data
req := &pb.EchoRequest{
MessageData: e.node.NewMessageData(uuid.New().String(), false),
Message: fmt.Sprintf("Echo from %s", e.node.ID())}
signature, err := e.node.signProtoMessage(req)
if err != nil {
log.Println("failed to sign message")
return false
}
// add the signature to the message
req.MessageData.Sign = signature
ok := e.node.sendProtoMessage(host.ID(), echoRequest, req)
if !ok {
return false
}
// store request so response handler has access to it
e.requests[req.MessageData.Id] = req
log.Printf("%s: Echo to: %s was sent. Message Id: %s, Message: %s", e.node.ID(), host.ID(), req.MessageData.Id, req.Message)
return true
}