Skip to content

Commit

Permalink
Proxy failed local requests to remote RPC server; closes #49
Browse files Browse the repository at this point in the history
  • Loading branch information
gagliardetto committed Sep 18, 2023
1 parent a03ae27 commit 3e91762
Showing 1 changed file with 90 additions and 52 deletions.
142 changes: 90 additions & 52 deletions multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ type ListenerConfig struct {
type ProxyConfig struct {
Target string `json:"target" yaml:"target"`
Headers map[string]string `json:"headers" yaml:"headers"`
// ProxyFailedRequests will proxy requests that fail to be handled by the local RPC server.
ProxyFailedRequests bool `json:"proxyFailedRequests" yaml:"proxyFailedRequests"`
}

func LoadProxyConfig(configFilepath string) (*ProxyConfig, error) {
Expand Down Expand Up @@ -230,16 +232,16 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx
}
klog.Infof("Will proxy unhandled RPC methods to %q", addr)
}
return func(c *fasthttp.RequestCtx) {
return func(reqCtx *fasthttp.RequestCtx) {
startedAt := time.Now()
reqID := randomRequestID()
defer func() {
klog.Infof("[%s] request took %s", reqID, time.Since(startedAt))
}()
{
// make sure the method is POST
if !c.IsPost() {
replyJSON(c, http.StatusMethodNotAllowed, jsonrpc2.Response{
if !reqCtx.IsPost() {
replyJSON(reqCtx, http.StatusMethodNotAllowed, jsonrpc2.Response{
Error: &jsonrpc2.Error{
Code: jsonrpc2.CodeMethodNotFound,
Message: "Method not allowed",
Expand All @@ -249,8 +251,8 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx
}

// limit request body size
if c.Request.Header.ContentLength() > 1024 {
replyJSON(c, http.StatusRequestEntityTooLarge, jsonrpc2.Response{
if reqCtx.Request.Header.ContentLength() > 1024 {
replyJSON(reqCtx, http.StatusRequestEntityTooLarge, jsonrpc2.Response{
Error: &jsonrpc2.Error{
Code: jsonrpc2.CodeInvalidRequest,
Message: "Request entity too large",
Expand All @@ -260,13 +262,13 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx
}
}
// read request body
body := c.Request.Body()
body := reqCtx.Request.Body()

// parse request
var rpcRequest jsonrpc2.Request
if err := json.Unmarshal(body, &rpcRequest); err != nil {
klog.Errorf("[%s] failed to parse request body: %v", err)
replyJSON(c, http.StatusBadRequest, jsonrpc2.Response{
replyJSON(reqCtx, http.StatusBadRequest, jsonrpc2.Response{
Error: &jsonrpc2.Error{
Code: jsonrpc2.CodeParseError,
Message: "Parse error",
Expand All @@ -280,53 +282,25 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx
if proxy != nil && !isValidLocalMethod(rpcRequest.Method) {
klog.Infof("[%s] Unhandled method %q, proxying to %q", reqID, rpcRequest.Method, proxy.Addr)
// proxy the request to the target
proxyReq := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(proxyReq)
{
for k, v := range lsConf.ProxyConfig.Headers {
proxyReq.Header.Set(k, v)
}
}
proxyReq.Header.SetMethod("POST")
proxyReq.Header.SetContentType("application/json")
proxyReq.SetRequestURI(lsConf.ProxyConfig.Target)
proxyReq.SetBody(body)
proxyResp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(proxyResp)
if err := proxy.Do(proxyReq, proxyResp); err != nil {
klog.Errorf("[%s] failed to proxy request: %v", reqID, err)
replyJSON(c, http.StatusInternalServerError, jsonrpc2.Response{
Error: &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
},
})
return
}
c.Response.Header.Set("Content-Type", "application/json")
c.Response.SetStatusCode(proxyResp.StatusCode())
if rpcRequest.Method == "getVersion" {
enriched, err := handler.tryEnrichGetVersion(proxyResp.Body())
if err != nil {
klog.Errorf("[%s] failed to enrich getVersion response: %v", reqID, err)
c.Response.SetBody(proxyResp.Body())
} else {
c.Response.SetBody(enriched)
}
} else {
c.Response.SetBody(proxyResp.Body())
}
// TODO: handle compression.
proxyToAlternativeRPCServer(
handler,
lsConf,
proxy,
reqCtx,
&rpcRequest,
body,
reqID,
)
return
}

rqCtx := &requestContext{ctx: c}
rqCtx := &requestContext{ctx: reqCtx}
method := rpcRequest.Method

if method == "getVersion" {
faithfulVersion := handler.GetFaithfulVersionInfo()
err := rqCtx.ReplyRaw(
c,
reqCtx,
rpcRequest.ID,
map[string]any{
"faithful": faithfulVersion,
Expand All @@ -339,21 +313,85 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx
}

// errorResp is the error response to be sent to the client.
errorResp, err := handler.handleRequest(c, rqCtx, &rpcRequest)
errorResp, err := handler.handleRequest(reqCtx, rqCtx, &rpcRequest)
if err != nil {
klog.Errorf("[%s] failed to handle %s: %v", reqID, sanitizeMethod(method), err)
}
if errorResp != nil {
rqCtx.ReplyWithError(
c,
rpcRequest.ID,
errorResp,
)
if proxy != nil && lsConf.ProxyConfig.ProxyFailedRequests {
klog.Infof("[%s] Failed local method %q, proxying to %q", reqID, rpcRequest.Method, proxy.Addr)
// proxy the request to the target
proxyToAlternativeRPCServer(
handler,
lsConf,
proxy,
reqCtx,
&rpcRequest,
body,
reqID,
)
return
} else {
rqCtx.ReplyWithError(
reqCtx,
rpcRequest.ID,
errorResp,
)
}
return
}
}
}

func proxyToAlternativeRPCServer(
handler *MultiEpoch,
lsConf *ListenerConfig,
proxy *fasthttp.HostClient,
reqCtx *fasthttp.RequestCtx,
rpcRequest *jsonrpc2.Request,
body []byte,
reqID string,
) {
// proxy the request to the target
proxyReq := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(proxyReq)
{
for k, v := range lsConf.ProxyConfig.Headers {
proxyReq.Header.Set(k, v)
}
}
proxyReq.Header.SetMethod("POST")
proxyReq.Header.SetContentType("application/json")
proxyReq.SetRequestURI(lsConf.ProxyConfig.Target)
proxyReq.SetBody(body)
proxyResp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(proxyResp)
if err := proxy.Do(proxyReq, proxyResp); err != nil {
klog.Errorf("[%s] failed to proxy request: %v", reqID, err)
replyJSON(reqCtx, http.StatusInternalServerError, jsonrpc2.Response{
Error: &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
},
})
return
}
reqCtx.Response.Header.Set("Content-Type", "application/json")
reqCtx.Response.SetStatusCode(proxyResp.StatusCode())
if rpcRequest.Method == "getVersion" {
enriched, err := handler.tryEnrichGetVersion(proxyResp.Body())
if err != nil {
klog.Errorf("[%s] failed to enrich getVersion response: %v", reqID, err)
reqCtx.Response.SetBody(proxyResp.Body())
} else {
reqCtx.Response.SetBody(enriched)
}
} else {
reqCtx.Response.SetBody(proxyResp.Body())
}
// TODO: handle compression.
}

func sanitizeMethod(method string) string {
if isValidLocalMethod(method) {
return method
Expand Down

0 comments on commit 3e91762

Please sign in to comment.