-
Notifications
You must be signed in to change notification settings - Fork 295
/
ipc.go
263 lines (227 loc) · 7.21 KB
/
ipc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
// Copyright (c) 2016-2021 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
)
// Messages sent over a pipe are encoded using a simple binary message format:
//
// - Protocol version (1 byte, currently 1)
// - Message type length (1 byte)
// - Message type string (encoded as UTF8, no longer than 255 bytes)
// - Message payload length (4 bytes, little endian)
// - Message payload bytes (no longer than 2^32 - 1 bytes)
type pipeMessage interface {
Type() string
PayloadSize() uint32
WritePayload(w io.Writer) error
}
var outgoingPipeMessages = make(chan pipeMessage)
// serviceControlPipeRx reads from the file descriptor fd of a read end pipe.
// This is intended to be used as a simple control mechanism for parent
// processes to communicate with and manage the lifetime of a dcrd child
// process using a unidirectional pipe (on Windows, this is an anonymous pipe,
// not a named pipe).
//
// When the pipe is closed or any other errors occur reading the control
// message, shutdown begins. This prevents dcrd from continuing to run
// unsupervised after the parent process closes unexpectedly.
//
// No control messages are currently defined and the only use for the pipe is to
// start clean shutdown when the pipe is closed. Control messages that follow
// the pipe message format can be added later as needed.
func serviceControlPipeRx(fd uintptr) {
pipe := os.NewFile(fd, fmt.Sprintf("|%v", fd))
r := bufio.NewReader(pipe)
for {
_, err := r.Discard(1024)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
dcrdLog.Errorf("Failed to read from pipe: %v", err)
break
}
}
select {
case shutdownRequestChannel <- struct{}{}:
default:
}
}
// serviceControlPipeTx sends pipe messages to the file descriptor fd of a write
// end pipe. This is intended to be a simple response and notification system
// for a child dcrd process to communicate with a parent process without the
// need to go through the RPC server.
//
// See the comment on the pipeMessage interface for the binary encoding of a
// pipe message.
func serviceControlPipeTx(fd uintptr) {
defer drainOutgoingPipeMessages()
pipe := os.NewFile(fd, fmt.Sprintf("|%v", fd))
w := bufio.NewWriter(pipe)
headerBuffer := make([]byte, 0, 1+1+255+4) // capped to max header size
var err error
for m := range outgoingPipeMessages {
const protocolVersion byte = 1
mtype := m.Type()
psize := m.PayloadSize()
headerBuffer = append(headerBuffer, protocolVersion)
headerBuffer = append(headerBuffer, byte(len(mtype)))
headerBuffer = append(headerBuffer, mtype...)
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, psize)
headerBuffer = append(headerBuffer, buf...)
_, err = w.Write(headerBuffer)
if err != nil {
break
}
err = m.WritePayload(w)
if err != nil {
break
}
err = w.Flush()
if err != nil {
break
}
headerBuffer = headerBuffer[:0]
}
dcrdLog.Errorf("Failed to write to pipe: %v", err)
}
func drainOutgoingPipeMessages() {
for range outgoingPipeMessages {
}
}
// The lifetimeEvent describes a startup or shutdown event. The message type
// string is "lifetimeevent".
//
// The payload size is always 2 bytes long. The first byte describes whether a
// service or event is about to run or whether startup has completed. The
// second byte, when applicable, describes which event or service is about to
// start or stop.
//
// 0 <event id>: The startup event is about to run
// 1 <ignored>: All startup tasks have completed
// 2 <event id>: The shutdown event is about to run
//
// Event IDs can take on the following values:
//
// 0: Database opening/closing
// 1: Peer-to-peer server starting/stopping
//
// Note that not all subsystems are started/stopped or events run during the
// program's lifetime depending on what features are enabled through the config.
//
// As an example, the following messages may be sent during a typical execution:
//
// 0 0: The database is being opened
// 0 1: The P2P server is starting
// 1 0: All startup tasks have completed
// 2 1: The P2P server is stopping
// 2 0: The database is being closed
type lifetimeEvent struct {
event lifetimeEventID
action lifetimeAction
}
var _ pipeMessage = (*lifetimeEvent)(nil)
type lifetimeEventID byte
const (
startupEvent lifetimeEventID = iota
startupComplete
shutdownEvent
)
type lifetimeAction byte
const (
lifetimeEventDBOpen lifetimeAction = iota
lifetimeEventP2PServer
)
func (*lifetimeEvent) Type() string { return "lifetimeevent" }
func (e *lifetimeEvent) PayloadSize() uint32 { return 2 }
func (e *lifetimeEvent) WritePayload(w io.Writer) error {
_, err := w.Write([]byte{byte(e.event), byte(e.action)})
return err
}
type lifetimeEventServer chan<- pipeMessage
func newLifetimeEventServer(outChan chan<- pipeMessage) lifetimeEventServer {
return lifetimeEventServer(outChan)
}
func (s lifetimeEventServer) notifyStartupEvent(action lifetimeAction) {
if s == nil {
return
}
s <- &lifetimeEvent{
event: startupEvent,
action: action,
}
}
func (s lifetimeEventServer) notifyStartupComplete() {
if s == nil {
return
}
s <- &lifetimeEvent{
event: startupComplete,
action: 0,
}
}
func (s lifetimeEventServer) notifyShutdownEvent(action lifetimeAction) {
if s == nil {
return
}
s <- &lifetimeEvent{
event: shutdownEvent,
action: action,
}
}
// boundP2PListenAddrEvent are IPC events emitted by dcrd with the bound local
// addresses for the P2P interface. Multiple events of this type may be
// generated.
//
// The type of this event is "p2plistenaddr" and the payload is the UTF-8
// encoded address.
type boundP2PListenAddrEvent string
func (e boundP2PListenAddrEvent) Type() string { return "p2plistenaddr" }
func (e boundP2PListenAddrEvent) PayloadSize() uint32 { return uint32(len(e)) }
func (e boundP2PListenAddrEvent) WritePayload(w io.Writer) error {
_, err := w.Write([]byte(e))
return err
}
// boundRPCListenAddrEvent are IPC events emitted by dcrd with the bound local
// addresses for the RPC interface. Multiple events of this type may be
// generated.
//
// The type of this event is "rpclistenaddr" and the payload is the UTF-8
// encoded address.
type boundRPCListenAddrEvent string
func (e boundRPCListenAddrEvent) Type() string { return "rpclistenaddr" }
func (e boundRPCListenAddrEvent) PayloadSize() uint32 { return uint32(len(e)) }
func (e boundRPCListenAddrEvent) WritePayload(w io.Writer) error {
_, err := w.Write([]byte(e))
return err
}
// boundAddrEventServer is a server that can notify parent processes, via the
// IPC mechanism, the local addresses to which specific subsystems are bound
// during initialization.
//
// The empty value is a valid server and will not send any events if its
// notify* methods are called.
type boundAddrEventServer chan<- pipeMessage
func newBoundAddrEventServer(outChan chan<- pipeMessage) boundAddrEventServer {
return boundAddrEventServer(outChan)
}
func (s boundAddrEventServer) notifyP2PAddress(addr string) {
if s == nil {
return
}
s <- boundP2PListenAddrEvent(addr)
}
func (s boundAddrEventServer) notifyRPCAddress(addr string) {
if s == nil {
return
}
s <- boundRPCListenAddrEvent(addr)
}