forked from philippseith/signalr
-
Notifications
You must be signed in to change notification settings - Fork 0
/
serversseconnection.go
96 lines (90 loc) · 2.32 KB
/
serversseconnection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package signalr
import (
"context"
"errors"
"fmt"
"github.com/teivah/onecontext"
"io"
"io/ioutil"
"net/http"
"strings"
"sync"
"time"
)
type serverSSEConnection struct {
ConnectionBase
mx sync.Mutex
postWriting bool
postWriter io.Writer
postReader io.Reader
mxWriter sync.Mutex
sseWriter io.Writer
sseFlusher http.Flusher
}
func newServerSSEConnection(parentContext context.Context, requestContext context.Context,
connectionID string, writer http.ResponseWriter) (*serverSSEConnection, error) {
sseFlusher, ok := writer.(http.Flusher)
if !ok {
return nil, errors.New("connection over Server Sent Events not supported with http.ResponseWriter: http.Flusher not implemented")
}
ctx, _ := onecontext.Merge(parentContext, requestContext)
s := serverSSEConnection{
ConnectionBase: ConnectionBase{
ctx: ctx,
connectionID: connectionID,
},
sseWriter: writer,
sseFlusher: sseFlusher,
}
s.postReader, s.postWriter = io.Pipe()
return &s, nil
}
func (s *serverSSEConnection) consumeRequest(request *http.Request) int {
if err := s.Context().Err(); err != nil {
return 410 // Gone
}
s.mx.Lock()
if s.postWriting {
s.mx.Unlock()
return 409 // Conflict
}
s.postWriting = true
s.mx.Unlock()
defer func() {
_ = request.Body.Close()
}()
body, err := ioutil.ReadAll(request.Body)
if err != nil {
return 400 // Bad request
} else if _, err := s.postWriter.Write(body); err != nil {
return 500 // Server error
}
s.mx.Lock()
s.postWriting = false
s.mx.Unlock()
<-time.After(50 * time.Millisecond)
return 200
}
func (s *serverSSEConnection) Read(p []byte) (n int, err error) {
if err := s.Context().Err(); err != nil {
return 0, fmt.Errorf("serverSSEConnection canceled: %w", s.ctx.Err())
}
return s.postReader.Read(p)
}
func (s *serverSSEConnection) Write(p []byte) (n int, err error) {
if err := s.Context().Err(); err != nil {
return 0, fmt.Errorf("serverSSEConnection canceled: %w", s.ctx.Err())
}
payload := ""
for _, line := range strings.Split(strings.TrimRight(string(p), "\n"), "\n") {
payload = payload + "data: " + line + "\n"
}
// The Write/Flush sequence might be called on different threads, so keep it atomic
s.mxWriter.Lock()
n, err = s.sseWriter.Write([]byte(payload + "\n"))
if err == nil {
s.sseFlusher.Flush()
}
s.mxWriter.Unlock()
return n, err
}