-
Notifications
You must be signed in to change notification settings - Fork 33
/
net_listener.go
238 lines (209 loc) · 7.19 KB
/
net_listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
/*
* Copyright 2023 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package shmipc
import (
"errors"
"net"
"sync"
"sync/atomic"
"time"
)
const defaultBacklog = 4096 // backlog number is the stream to accept channel size
type listener struct {
listener net.Listener // the raw listener
sessions map[*Session]*sync.WaitGroup // all established sessions
mu sync.Mutex // lock sessions
closed uint32 // to mark if the raw listener is closed when accept returns error
closeCh chan struct{} // to make select returns when closed
backlog chan net.Conn // all accepted streams(will never be closed otherwise may send to closed channel)
}
// create listener and run background goroutines
func newListener(rawListener net.Listener, backlog int) *listener {
listener := &listener{
listener: rawListener,
sessions: make(map[*Session]*sync.WaitGroup),
backlog: make(chan net.Conn, backlog),
closeCh: make(chan struct{}),
}
go listener.listenLoop()
return listener
}
// accept connection from the raw listener in loop,
// spawn another goroutine to create session with the connection, save it, and then accept streams from the session.
func (l *listener) listenLoop() {
for {
conn, err := l.listener.Accept()
if err != nil {
if atomic.LoadUint32(&l.closed) == 1 {
internalLogger.infof("listener closed: %s", err)
return
}
internalLogger.errorf("error when accept: %s", err)
continue
}
internalLogger.info("receive a new incoming raw connection")
go func() {
session, err := Server(conn, DefaultConfig())
if err != nil {
internalLogger.errorf("error when create session: %s", err)
return
}
internalLogger.info("new session created")
l.mu.Lock()
if atomic.LoadUint32(&l.closed) == 1 {
l.mu.Unlock()
_ = session.Close()
internalLogger.infof("listener is closed and the session should be closed")
return
}
// Here we maintain a ref counter for every session.
// The listener holds 1 ref, and every stream holds 1 ref.
// Only when listener closed and all stream closed, the session be terminated.
wg := new(sync.WaitGroup)
wg.Add(1)
l.sessions[session] = wg
l.mu.Unlock()
go func() {
wg.Wait()
_ = session.Close()
internalLogger.infof("wait group finished, session is closed")
}()
for {
stream, err := session.AcceptStream()
if err != nil {
if err != ErrSessionShutdown {
internalLogger.errorf("error when accept new stream: %s", err)
}
_ = session.Close()
internalLogger.infof("session is closed early: %s", err)
l.mu.Lock()
if _, ok := l.sessions[session]; ok {
delete(l.sessions, session)
wg.Done()
}
l.mu.Unlock()
return
}
internalLogger.info("accepted a new stream")
conn := newStreamWrapper(stream, stream.LocalAddr(), stream.RemoteAddr(), wg)
select {
case <-l.closeCh:
return
case l.backlog <- conn:
}
}
}()
}
}
// accept gets connections from the backlog channel
func (l *listener) Accept() (net.Conn, error) {
select {
case conn := <-l.backlog:
return conn, nil
case <-l.closeCh:
return nil, errors.New("listener is closed")
}
}
// When listen closed, all sessions should be closed which otherwise would leak.
// Because the underlying connection is closed, all streams will be closed too.
// Note: The close behaviour here may differs from a normal connection.
func (l *listener) Close() (err error) {
// mark closed and close the listener
swapped := atomic.CompareAndSwapUint32(&l.closed, 0, 1)
err = l.listener.Close()
// close the closeCh to make blocking call return
if swapped {
close(l.closeCh)
}
// closed and clear sessions to avoid leaking
l.mu.Lock()
for _, wg := range l.sessions {
wg.Done()
}
l.sessions = map[*Session]*sync.WaitGroup{}
l.mu.Unlock()
return
}
// Addr is forwarded to the raw listener
func (l *listener) Addr() net.Addr {
return l.listener.Addr()
}
// Listen create listener with default backlog size(4096)
// shmIPCAddress is uds address used as underlying connection, the returned value is net.Listener
// Remember close the listener if it is created successfully, or goroutine may leak
// Should I use Listen?
// If you want the best performance, you should use low level API(not this one) to marshal and unmarshal manually,
// which can achieve better batch results.
// If you just care about the compatibility, you can use this high level API. For example, you can hardly change grpc
// and protobuf, then you can use this listener to make it compatible with a little bit improved performance.
func Listen(shmIPCAddress string) (net.Listener, error) {
return ListenWithBacklog(shmIPCAddress, defaultBacklog)
}
// ListenWithBacklog create listener with given backlog size
// shmIPCAddress is uds address used as underlying connection, the returned value is net.Listener
// Remember close the listener if it is created successfully, or goroutine may leak
// Should I use ListenWithBacklog?
// If you want the best performance, you should use low level API(not this one) to marshal and unmarshal manually,
// which can achieve better batch results.
// If you just care about the compatibility, you can use this high level API. For example, you can hardly change grpc
// and protobuf, then you can use this listener to make it compatible with a little bit improved performance.
func ListenWithBacklog(shmIPCAddress string, backlog int) (net.Listener, error) {
rawListener, err := net.Listen("unix", shmIPCAddress)
if err != nil {
return nil, err
}
return newListener(rawListener, backlog), nil
}
// A wrapper around a stream to impl net.Conn
func newStreamWrapper(stream *Stream, localAddr, remoteAddr net.Addr, wg *sync.WaitGroup) net.Conn {
wg.Add(1)
return &streamWrapper{stream: stream, localAddr: localAddr, remoteAddr: remoteAddr, wg: wg}
}
type streamWrapper struct {
stream *Stream
localAddr net.Addr
remoteAddr net.Addr
closed uint32
wg *sync.WaitGroup
}
func (s *streamWrapper) Read(b []byte) (n int, err error) {
return s.stream.copyRead(b)
}
func (s *streamWrapper) Write(b []byte) (n int, err error) {
return s.stream.copyWriteAndFlush(b)
}
func (s *streamWrapper) Close() error {
if atomic.CompareAndSwapUint32(&s.closed, 0, 1) {
_ = s.stream.Close()
s.wg.Done()
}
return nil
}
func (s *streamWrapper) LocalAddr() net.Addr {
return s.localAddr
}
func (s *streamWrapper) RemoteAddr() net.Addr {
return s.remoteAddr
}
func (s *streamWrapper) SetDeadline(t time.Time) error {
return s.stream.SetDeadline(t)
}
func (s *streamWrapper) SetReadDeadline(t time.Time) error {
return s.stream.SetReadDeadline(t)
}
func (s *streamWrapper) SetWriteDeadline(t time.Time) error {
return s.stream.SetWriteDeadline(t)
}