-
Notifications
You must be signed in to change notification settings - Fork 6
/
request.go
107 lines (92 loc) · 2.06 KB
/
request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package grid
import (
"context"
"errors"
"sync"
"github.com/lytics/grid/v3/codec"
)
var (
// ErrAlreadyResponded when respond is called multiple
// times on a request.
ErrAlreadyResponded = errors.New("already responded")
)
var (
constAck = &Ack{}
)
// Request which must receive an ack or response.
type Request interface {
Context() context.Context
Msg() interface{}
Ack() error
Respond(msg interface{}) error
}
// newRequest state for use in the server. This actually converts
// between the "context" and "golang.org/x/net/context" types of
// Context so that method signatures are satisfied.
func newRequest(ctx context.Context, msg interface{}) *request {
return &request{
ctx: ctx,
msg: msg,
failure: make(chan error, 1),
response: make(chan *Delivery, 1),
}
}
type request struct {
mu sync.Mutex
msg interface{}
ctx context.Context
failure chan error
response chan *Delivery
finished bool
}
// Context of request.
func (req *request) Context() context.Context {
return req.ctx
}
// Msg of the request.
func (req *request) Msg() interface{} {
return req.msg
}
// Ack request, same as responding with Respond
// and "Ack" message.
func (req *request) Ack() error {
return req.Respond(constAck)
}
// Respond to request with a message.
func (req *request) Respond(msg interface{}) error {
req.mu.Lock()
defer req.mu.Unlock()
if req.finished {
return ErrAlreadyResponded
}
req.finished = true
fail, ok := msg.(error)
if ok {
select {
case req.failure <- fail:
return nil
default:
panic("grid: respond called multiple times")
}
}
// Encode the message here, in the thread of
// execution of the caller.
typeName, data, err := codec.Marshal(msg)
if err != nil {
return err
}
res := &Delivery{
Ver: Delivery_V1,
Data: data,
TypeName: typeName,
}
// Send the response bytes. Again, the bytes need
// to be generated by the thread of execution of
// the caller of Respond.
select {
case req.response <- res:
return nil
default:
panic("grid: respond called multiple times")
}
}