From 3e91762a7c9a874cb44fa814e5379541751b0877 Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Mon, 18 Sep 2023 14:22:43 +0200 Subject: [PATCH] Proxy failed local requests to remote RPC server; closes #49 --- multiepoch.go | 142 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 90 insertions(+), 52 deletions(-) diff --git a/multiepoch.go b/multiepoch.go index 3b7385b2..79628ae4 100644 --- a/multiepoch.go +++ b/multiepoch.go @@ -162,6 +162,8 @@ type ListenerConfig struct { type ProxyConfig struct { Target string `json:"target" yaml:"target"` Headers map[string]string `json:"headers" yaml:"headers"` + // ProxyFailedRequests will proxy requests that fail to be handled by the local RPC server. + ProxyFailedRequests bool `json:"proxyFailedRequests" yaml:"proxyFailedRequests"` } func LoadProxyConfig(configFilepath string) (*ProxyConfig, error) { @@ -230,7 +232,7 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx } klog.Infof("Will proxy unhandled RPC methods to %q", addr) } - return func(c *fasthttp.RequestCtx) { + return func(reqCtx *fasthttp.RequestCtx) { startedAt := time.Now() reqID := randomRequestID() defer func() { @@ -238,8 +240,8 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx }() { // make sure the method is POST - if !c.IsPost() { - replyJSON(c, http.StatusMethodNotAllowed, jsonrpc2.Response{ + if !reqCtx.IsPost() { + replyJSON(reqCtx, http.StatusMethodNotAllowed, jsonrpc2.Response{ Error: &jsonrpc2.Error{ Code: jsonrpc2.CodeMethodNotFound, Message: "Method not allowed", @@ -249,8 +251,8 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx } // limit request body size - if c.Request.Header.ContentLength() > 1024 { - replyJSON(c, http.StatusRequestEntityTooLarge, jsonrpc2.Response{ + if reqCtx.Request.Header.ContentLength() > 1024 { + replyJSON(reqCtx, http.StatusRequestEntityTooLarge, jsonrpc2.Response{ Error: &jsonrpc2.Error{ Code: jsonrpc2.CodeInvalidRequest, Message: "Request entity too large", @@ -260,13 +262,13 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx } } // read request body - body := c.Request.Body() + body := reqCtx.Request.Body() // parse request var rpcRequest jsonrpc2.Request if err := json.Unmarshal(body, &rpcRequest); err != nil { klog.Errorf("[%s] failed to parse request body: %v", err) - replyJSON(c, http.StatusBadRequest, jsonrpc2.Response{ + replyJSON(reqCtx, http.StatusBadRequest, jsonrpc2.Response{ Error: &jsonrpc2.Error{ Code: jsonrpc2.CodeParseError, Message: "Parse error", @@ -280,53 +282,25 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx if proxy != nil && !isValidLocalMethod(rpcRequest.Method) { klog.Infof("[%s] Unhandled method %q, proxying to %q", reqID, rpcRequest.Method, proxy.Addr) // proxy the request to the target - proxyReq := fasthttp.AcquireRequest() - defer fasthttp.ReleaseRequest(proxyReq) - { - for k, v := range lsConf.ProxyConfig.Headers { - proxyReq.Header.Set(k, v) - } - } - proxyReq.Header.SetMethod("POST") - proxyReq.Header.SetContentType("application/json") - proxyReq.SetRequestURI(lsConf.ProxyConfig.Target) - proxyReq.SetBody(body) - proxyResp := fasthttp.AcquireResponse() - defer fasthttp.ReleaseResponse(proxyResp) - if err := proxy.Do(proxyReq, proxyResp); err != nil { - klog.Errorf("[%s] failed to proxy request: %v", reqID, err) - replyJSON(c, http.StatusInternalServerError, jsonrpc2.Response{ - Error: &jsonrpc2.Error{ - Code: jsonrpc2.CodeInternalError, - Message: "Internal error", - }, - }) - return - } - c.Response.Header.Set("Content-Type", "application/json") - c.Response.SetStatusCode(proxyResp.StatusCode()) - if rpcRequest.Method == "getVersion" { - enriched, err := handler.tryEnrichGetVersion(proxyResp.Body()) - if err != nil { - klog.Errorf("[%s] failed to enrich getVersion response: %v", reqID, err) - c.Response.SetBody(proxyResp.Body()) - } else { - c.Response.SetBody(enriched) - } - } else { - c.Response.SetBody(proxyResp.Body()) - } - // TODO: handle compression. + proxyToAlternativeRPCServer( + handler, + lsConf, + proxy, + reqCtx, + &rpcRequest, + body, + reqID, + ) return } - rqCtx := &requestContext{ctx: c} + rqCtx := &requestContext{ctx: reqCtx} method := rpcRequest.Method if method == "getVersion" { faithfulVersion := handler.GetFaithfulVersionInfo() err := rqCtx.ReplyRaw( - c, + reqCtx, rpcRequest.ID, map[string]any{ "faithful": faithfulVersion, @@ -339,21 +313,85 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx } // errorResp is the error response to be sent to the client. - errorResp, err := handler.handleRequest(c, rqCtx, &rpcRequest) + errorResp, err := handler.handleRequest(reqCtx, rqCtx, &rpcRequest) if err != nil { klog.Errorf("[%s] failed to handle %s: %v", reqID, sanitizeMethod(method), err) } if errorResp != nil { - rqCtx.ReplyWithError( - c, - rpcRequest.ID, - errorResp, - ) + if proxy != nil && lsConf.ProxyConfig.ProxyFailedRequests { + klog.Infof("[%s] Failed local method %q, proxying to %q", reqID, rpcRequest.Method, proxy.Addr) + // proxy the request to the target + proxyToAlternativeRPCServer( + handler, + lsConf, + proxy, + reqCtx, + &rpcRequest, + body, + reqID, + ) + return + } else { + rqCtx.ReplyWithError( + reqCtx, + rpcRequest.ID, + errorResp, + ) + } return } } } +func proxyToAlternativeRPCServer( + handler *MultiEpoch, + lsConf *ListenerConfig, + proxy *fasthttp.HostClient, + reqCtx *fasthttp.RequestCtx, + rpcRequest *jsonrpc2.Request, + body []byte, + reqID string, +) { + // proxy the request to the target + proxyReq := fasthttp.AcquireRequest() + defer fasthttp.ReleaseRequest(proxyReq) + { + for k, v := range lsConf.ProxyConfig.Headers { + proxyReq.Header.Set(k, v) + } + } + proxyReq.Header.SetMethod("POST") + proxyReq.Header.SetContentType("application/json") + proxyReq.SetRequestURI(lsConf.ProxyConfig.Target) + proxyReq.SetBody(body) + proxyResp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(proxyResp) + if err := proxy.Do(proxyReq, proxyResp); err != nil { + klog.Errorf("[%s] failed to proxy request: %v", reqID, err) + replyJSON(reqCtx, http.StatusInternalServerError, jsonrpc2.Response{ + Error: &jsonrpc2.Error{ + Code: jsonrpc2.CodeInternalError, + Message: "Internal error", + }, + }) + return + } + reqCtx.Response.Header.Set("Content-Type", "application/json") + reqCtx.Response.SetStatusCode(proxyResp.StatusCode()) + if rpcRequest.Method == "getVersion" { + enriched, err := handler.tryEnrichGetVersion(proxyResp.Body()) + if err != nil { + klog.Errorf("[%s] failed to enrich getVersion response: %v", reqID, err) + reqCtx.Response.SetBody(proxyResp.Body()) + } else { + reqCtx.Response.SetBody(enriched) + } + } else { + reqCtx.Response.SetBody(proxyResp.Body()) + } + // TODO: handle compression. +} + func sanitizeMethod(method string) string { if isValidLocalMethod(method) { return method