-
Notifications
You must be signed in to change notification settings - Fork 3
/
sidecar.go
205 lines (171 loc) · 5.46 KB
/
sidecar.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package ringman
import (
"encoding/json"
"fmt"
"net/http"
"github.com/Nitro/sidecar/catalog"
"github.com/Nitro/sidecar/receiver"
"github.com/Nitro/sidecar/service"
"github.com/relistan/go-director"
log "github.com/sirupsen/logrus"
)
const (
DefaultReceiverCapacity = 50
)
// A SidecarRing is a ring backed by service discovery from Sidecar
// https://github.com/Nitro/sidecar . Sidecar itself uses Memberlist under
// the covers, but layers a lot more on top. Sidecar takes care of all
// the work of managing and bootstrapping the cluster so we don't need
// to know anything about cluster seeds. This service is expected to
// subscribe to Sidecar events, however, and uses a Sidecar Receiver to
// process them.
type SidecarRing struct {
managerLooper director.Looper
manager *HashRingManager
sidecarUrl string
svcName string
svcPort int64
rcvr *receiver.Receiver
nodes map[string]struct{} // Tracking which nodes we already know about
}
// Ensure SidecarRing implements Ring interface
var _ Ring = (*SidecarRing)(nil)
// NewSidecarRing returns a properly configured SidecarRing that will filter
// incoming changes by the service name provided and will only watch the
// ServicePort number passed in. If the SidecarUrl is not empty string,
// then we will call that address to get initial state on bootstrap.
func NewSidecarRing(sidecarUrl string, svcName string, svcPort int64) (*SidecarRing, error) {
ringMgr := NewHashRingManager([]string{})
looper := director.NewFreeLooper(director.FOREVER, nil)
go ringMgr.Run(looper)
scRing := &SidecarRing{
manager: ringMgr,
managerLooper: looper,
sidecarUrl: sidecarUrl,
svcName: svcName,
svcPort: svcPort,
}
// Set up the receiver for incoming requests
rcvr := receiver.NewReceiver(DefaultReceiverCapacity, scRing.onUpdate)
// Subscribe to only the service requested
rcvr.Subscribe(svcName)
scRing.rcvr = rcvr
// If we were given a Sidecar address to bootstrap from, then do it. Otherwie
// we just wait for updates.
if sidecarUrl != "" {
err := rcvr.FetchInitialState(sidecarUrl)
if err != nil {
return nil, err
}
}
go rcvr.ProcessUpdates()
return scRing, nil
}
// onUpdate takes care of incoming updates from the receiver
func (r *SidecarRing) onUpdate(state *catalog.ServicesState) {
newNodes := make(map[string]struct{}, len(r.nodes)+5) // Likely to be similar length
state.EachService(func(hostname *string, serviceId *string, svc *service.Service) {
if svc.Name == r.svcName && svc.IsAlive() { // Only get ALIVE nodes...
key, err := r.keyForService(svc)
if err != nil {
log.Error(err)
return
}
newNodes[key] = struct{}{}
}
})
// Was it it in the new group and not in the old one? Add it.
for name := range newNodes {
if _, ok := r.nodes[name]; !ok {
r.manager.AddNode(name)
}
}
// In the old group but not in the new one? Remove it.
for name := range r.nodes {
if _, ok := newNodes[name]; !ok {
r.manager.RemoveNode(name)
}
}
// Overwrite the old set
r.nodes = newNodes
}
// keyForService takes a service and returns the key we use to store it in the
// hashring. Currently based on the IP address and service port.
func (r *SidecarRing) keyForService(svc *service.Service) (string, error) {
var matched *service.Port
for _, port := range svc.Ports {
if port.ServicePort == r.svcPort {
matched = &port
break
}
}
if matched == nil {
return "", fmt.Errorf(
"Can't match service port %d for incoming service %s!",
r.svcPort, svc.ID,
)
}
var key string
if matched.IP == "" {
key = svc.Hostname
} else {
key = matched.IP
}
return fmt.Sprintf("%s:%d", key, matched.Port), nil
}
// HttpListNodesHandler is an http.Handler that will return a JSON-encoded list of
// the Sidecar nodes in the current ring.
func (r *SidecarRing) HttpListNodesHandler(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
jsonBytes, err := json.MarshalIndent(&r.nodes, "", " ")
if err != nil {
http.Error(w, err.Error(), 500)
return
}
w.Write(jsonBytes)
}
// HttpGetNodeHandler is an http.Handler that will return an object containing the
// node that currently owns a specific key.
func (r *SidecarRing) HttpGetNodeHandler(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
key := req.FormValue("key")
if key == "" {
http.Error(w, `{"status": "error", "message": "Invalid key"}`, 404)
return
}
if r == nil {
http.Error(w, `{"status": "error", "message": "SidecarRing was nil"}`, 500)
return
}
node, _ := r.manager.GetNode(key)
respObj := struct {
Node string
Key string
}{node, key}
jsonBytes, err := json.MarshalIndent(respObj, "", " ")
if err != nil {
http.Error(w, err.Error(), 500)
}
w.Write(jsonBytes)
}
// HttpMux returns an http.ServeMux configured to run the HTTP handlers on the
// SidecarRing. You can either use this one, or mount the handlers on a mux of your
// own choosing (e.g. Gorilla mux or httprouter)
func (r *SidecarRing) HttpMux() *http.ServeMux {
updateHandler := func(w http.ResponseWriter, req *http.Request) {
receiver.UpdateHandler(w, req, r.rcvr)
}
mux := http.NewServeMux()
mux.HandleFunc("/nodes/get", r.HttpGetNodeHandler)
mux.HandleFunc("/nodes", r.HttpListNodesHandler)
mux.HandleFunc("/update", updateHandler)
return mux
}
func (r *SidecarRing) Manager() *HashRingManager {
return r.manager
}
// Shutdown stops the Receiver and the HashringManager
func (r *SidecarRing) Shutdown() {
r.rcvr.Looper.Quit()
r.managerLooper.Quit()
}