forked from TykTechnologies/tyk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
middleware_virtual_endpoint.go
299 lines (238 loc) · 8.02 KB
/
middleware_virtual_endpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
package main
import (
"bytes"
b64 "encoding/base64"
"encoding/json"
"github.com/gorilla/context"
"github.com/lonelycode/tykcommon"
"github.com/mitchellh/mapstructure"
"io"
"io/ioutil"
"net/http"
"strconv"
"time"
)
// RequestObject is marshalled to JSON string and pased into JSON middleware
type RequestObject struct {
Headers map[string][]string
Body string
URL string
Params map[string][]string
}
type ResponseObject struct {
Body string
Headers map[string]string
Code int
}
type VMResponseObject struct {
Response ResponseObject
SessionMeta map[string]string
}
// DynamicMiddleware is a generic middleware that will execute JS code before continuing
type VirtualEndpoint struct {
*TykMiddleware
sh SuccessHandler
}
func PreLoadVirtualMetaCode(meta *tykcommon.VirtualMeta, j *JSVM) {
if j == nil {
log.Error("No JSVM loaded, cannot init methods")
return
}
if meta != nil {
if meta.FunctionSourceType == "file" {
js, loadErr := ioutil.ReadFile(meta.FunctionSourceURI)
if loadErr != nil {
log.Error("Failed to load Endpoint JS: ", loadErr)
} else {
// No error, load the JS into the VM
log.Debug("Loading JS Endpoint File: ", meta.FunctionSourceURI)
j.VM.Run(js)
}
} else if meta.FunctionSourceType == "blob" {
if config.DisableVirtualPathBlobs {
log.Error("[JSVM] Blobs not allowerd on this node")
return
}
js, loadErr := b64.StdEncoding.DecodeString(meta.FunctionSourceURI)
if loadErr != nil {
log.Error("Failed to load blob JS: ", loadErr)
} else {
// No error, load the JS into the VM
log.Debug("Loading JS blob")
j.VM.Run(js)
}
} else {
log.Error("Type must be either file or blob (b64)!")
}
}
}
type VirtualEndpointConfig struct {
ConfigData map[string]string `mapstructure:"config_data" bson:"config_data" json:"config_data"`
}
// New lets you do any initialisations for the object can be done here
func (d *VirtualEndpoint) New() {
d.sh = SuccessHandler{d.TykMiddleware}
}
// GetConfig retrieves the configuration from the API config - we user mapstructure for this for simplicity
func (d *VirtualEndpoint) GetConfig() (interface{}, error) {
var thisModuleConfig VirtualEndpointConfig
err := mapstructure.Decode(d.TykMiddleware.Spec.APIDefinition.RawData, &thisModuleConfig)
if err != nil {
log.Error(err)
return nil, err
}
return thisModuleConfig, nil
}
func (d *VirtualEndpoint) ServeHTTPForCache(w http.ResponseWriter, r *http.Request) *http.Response {
// Check if we are even using this MW
var stat RequestStatus
var meta interface{}
var found bool
_, versionPaths, _, _ := d.TykMiddleware.Spec.GetVersionData(r)
found, meta = d.TykMiddleware.Spec.CheckSpecMatchesStatus(r.URL.Path, r.Method, versionPaths, VirtualPath)
if found {
stat = StatusVirtualPath
} else {
return nil
}
if stat != StatusVirtualPath {
return nil
}
var copiedRequest *http.Request
if config.AnalyticsConfig.EnableDetailedRecording {
copiedRequest = CopyHttpRequest(r)
}
t1 := time.Now().UnixNano()
thisMeta := meta.(*tykcommon.VirtualMeta)
// Create the proxy object
defer r.Body.Close()
originalBody, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Error("Failed to read request body! ", err)
return nil
}
thisRequestData := RequestObject{
Headers: r.Header,
Body: string(originalBody),
URL: r.URL.Path,
}
// We need to copy the body _back_ for the decode
r.Body = nopCloser{bytes.NewBuffer(originalBody)}
r.ParseForm()
thisRequestData.Params = r.Form
asJsonRequestObj, encErr := json.Marshal(thisRequestData)
if encErr != nil {
log.Error("Failed to encode request object for virtual endpoint: ", encErr)
return nil
}
// Encode the configuration data too
configData, cErr := d.GetConfig()
if cErr != nil {
log.Error("Failed to parse configuration data: ", cErr)
configData = make(map[string]string)
}
asJsonConfigData, encErr := json.Marshal(configData)
if encErr != nil {
log.Error("Failed to encode request object for virtual endpoint: ", encErr)
return nil
}
var thisSessionState = SessionState{}
var authHeaderValue = ""
// Encode the session object (if not a pre-process)
if thisMeta.UseSession {
thisSessionState = context.Get(r, SessionData).(SessionState)
authHeaderValue = context.Get(r, AuthHeaderValue).(string)
}
sessionAsJsonObj, sessEncErr := json.Marshal(thisSessionState)
if sessEncErr != nil {
log.Error("Failed to encode session for VM: ", sessEncErr)
return nil
}
// Run the middleware
thisVM := d.Spec.JSVM.VM.Copy()
returnRaw, _ := thisVM.Run(thisMeta.ResponseFunctionName + `(` + string(asJsonRequestObj) + `, ` + string(sessionAsJsonObj) + `, ` + string(asJsonConfigData) + `);`)
returnDataStr, _ := returnRaw.ToString()
// Decode the return object
newResponseData := VMResponseObject{}
decErr := json.Unmarshal([]byte(returnDataStr), &newResponseData)
if decErr != nil {
log.Error("Failed to decode virtual endpoint response data on return from VM: ", decErr)
log.Error("--> Returned: ", returnDataStr)
return nil
}
// Save the sesison data (if modified)
if thisMeta.UseSession {
thisSessionState.MetaData = newResponseData.SessionMeta
d.Spec.SessionManager.UpdateSession(authHeaderValue, thisSessionState, 0)
}
log.Debug("JSVM Virtual Endpoint execution took: (ns) ", time.Now().UnixNano()-t1)
responseMessage := []byte(newResponseData.Response.Body)
// Create an http.Response object so we can send it tot he cache middleware
newResponse := new(http.Response)
newResponse.Header = make(map[string][]string)
requestTime := time.Now().UTC().Format(http.TimeFormat)
for header, value := range newResponseData.Response.Headers {
newResponse.Header.Add(header, value)
}
newResponse.ContentLength = int64(len(responseMessage))
newResponse.Body = ioutil.NopCloser(bytes.NewReader(responseMessage))
newResponse.StatusCode = newResponseData.Response.Code
newResponse.Proto = "HTTP/1.0"
newResponse.ProtoMajor = 1
newResponse.ProtoMinor = 0
newResponse.Header.Add("Server", "tyk")
newResponse.Header.Add("Date", requestTime)
// Handle response middleware
ResponseHandler := ResponseChain{}
chainErr := ResponseHandler.Go(d.TykMiddleware.Spec.ResponseChain, w, newResponse, r, &thisSessionState)
if chainErr != nil {
log.Error("Response chain failed! ", chainErr)
}
// deep logging
var copiedResponse *http.Response
if config.AnalyticsConfig.EnableDetailedRecording {
copiedResponse = CopyHttpResponse(newResponse)
}
// Clone the response so we can save it
copiedRes := new(http.Response)
*copiedRes = *newResponse // includes shallow copies of maps, but okay
defer newResponse.Body.Close()
// Buffer body data
var bodyBuffer bytes.Buffer
bodyBuffer2 := new(bytes.Buffer)
io.Copy(&bodyBuffer, newResponse.Body)
*bodyBuffer2 = bodyBuffer
// Create new ReadClosers so we can split output
newResponse.Body = ioutil.NopCloser(&bodyBuffer)
copiedRes.Body = ioutil.NopCloser(bodyBuffer2)
d.HandleResponse(w, newResponse, &thisSessionState)
// Record analytics
go d.sh.RecordHit(w, r, 0, newResponse.StatusCode, copiedRequest, copiedResponse)
return copiedRes
}
// ProcessRequest will run any checks on the request on the way through the system, return an error to have the chain fail
func (d *VirtualEndpoint) ProcessRequest(w http.ResponseWriter, r *http.Request, configuration interface{}) (error, int) {
res := d.ServeHTTPForCache(w, r)
if res == nil {
return nil, 200
}
return nil, 666
}
func (d *VirtualEndpoint) HandleResponse(rw http.ResponseWriter, res *http.Response, ses *SessionState) error {
defer res.Body.Close()
// Close connections
if config.CloseConnections {
res.Header.Set("Connection", "close")
}
// Add resource headers
if ses != nil {
// We have found a session, lets report back
res.Header.Add("X-RateLimit-Limit", strconv.Itoa(int(ses.QuotaMax)))
res.Header.Add("X-RateLimit-Remaining", strconv.Itoa(int(ses.QuotaRemaining)))
res.Header.Add("X-RateLimit-Reset", strconv.Itoa(int(ses.QuotaRenews)))
}
copyHeader(rw.Header(), res.Header)
rw.WriteHeader(res.StatusCode)
io.Copy(rw, res.Body)
return nil
}