Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for VM gRPC services #3294

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions api/server/grpc_router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package server

import (
"errors"
"fmt"
"net/http"
"strings"
"sync"
)

var (
_ http.Handler = (*grpcRouter)(nil)

ErrDuplicateHandler = errors.New("duplicate handler")
)

func newGRPCRouter() *grpcRouter {
return &grpcRouter{
handlers: make(map[string]http.Handler),
}
}

type grpcRouter struct {
lock sync.RWMutex
handlers map[string]http.Handler
}

func (g *grpcRouter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
g.lock.RLock()
defer g.lock.RUnlock()

// Requests take the form of "/Service/Method"
parsed := strings.Split(r.RequestURI, "/")
if len(parsed) < 2 {
w.WriteHeader(http.StatusBadRequest)
return
}

handler, ok := g.handlers[parsed[1]]
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}

handler.ServeHTTP(w, r)
}

func (g *grpcRouter) Add(serviceName string, handler http.Handler) error {
g.lock.Lock()
defer g.lock.Unlock()

if _, ok := g.handlers[serviceName]; ok {
return fmt.Errorf("failed to register %s: %w", serviceName, ErrDuplicateHandler)
}

g.handlers[serviceName] = handler
return nil
}
71 changes: 71 additions & 0 deletions api/server/grpc_router_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package server

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"
)

// TODO refactor RegisterChain to return an error so these tests can be against
// the exported package api
func TestGRPCRouterAdd(t *testing.T) {
require := require.New(t)
g := newGRPCRouter()
h := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})

require.NoError(g.Add("foo", h))

err := g.Add("foo", h)
require.ErrorIs(err, ErrDuplicateHandler)
}

func TestGRPCRouterServeHTTP(t *testing.T) {
tests := []struct {
name string
handlers []string
uri string
wantCode int
}{
{
name: "invalid request",
uri: "foobar",
wantCode: http.StatusBadRequest,
},
{
name: "invalid handler",
uri: "/foo/method",
wantCode: http.StatusNotFound,
},
{
name: "valid handler",
handlers: []string{"foo"},
uri: "/foo/method",
wantCode: http.StatusOK,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)

g := newGRPCRouter()
h := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})
w := httptest.NewRecorder()
r := httptest.NewRequest("", "/", nil)

r.RequestURI = tt.uri

for _, handler := range tt.handlers {
require.NoError(g.Add(handler, h))
}

g.ServeHTTP(w, r)
require.Equal(tt.wantCode, w.Code)
})
}
}
84 changes: 61 additions & 23 deletions api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"net/http"
"net/url"
"path"
"strings"
"time"

"github.com/NYTimes/gziphandler"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/cors"
"go.uber.org/zap"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"

"github.com/ava-labs/avalanchego/api"
"github.com/ava-labs/avalanchego/ids"
Expand Down Expand Up @@ -90,7 +92,8 @@ type server struct {
metrics *metrics

// Maps endpoints to handlers
router *router
router *router
grpcRouter *grpcRouter

srv *http.Server

Expand Down Expand Up @@ -118,33 +121,27 @@ func New(
}

router := newRouter()
allowedHostsHandler := filterInvalidHosts(router, allowedHosts)
corsHandler := cors.New(cors.Options{
AllowedOrigins: allowedOrigins,
AllowCredentials: true,
}).Handler(allowedHostsHandler)
gzipHandler := gziphandler.GzipHandler(corsHandler)
var handler http.Handler = http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
// Attach this node's ID as a header
w.Header().Set("node-id", nodeID.String())
gzipHandler.ServeHTTP(w, r)
},
)
handler := wrapHandler(router, nodeID, allowedOrigins, allowedHosts)

grpcRouter := newGRPCRouter()
grpcHandler := wrapHandler(grpcRouter, nodeID, allowedOrigins, allowedHosts)

httpServer := &http.Server{
Handler: handler,
Handler: h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
grpcHandler.ServeHTTP(w, r)
return
}

handler.ServeHTTP(w, r)
}), &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
}),
ReadTimeout: httpConfig.ReadTimeout,
ReadHeaderTimeout: httpConfig.ReadHeaderTimeout,
WriteTimeout: httpConfig.WriteTimeout,
IdleTimeout: httpConfig.IdleTimeout,
}
err = http2.ConfigureServer(httpServer, &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
})
if err != nil {
return nil, err
}

log.Info("API created",
zap.Strings("allowedOrigins", allowedOrigins),
Expand All @@ -158,6 +155,7 @@ func New(
tracer: tracer,
metrics: m,
router: router,
grpcRouter: grpcRouter,
srv: httpServer,
listener: listener,
}, nil
Expand Down Expand Up @@ -203,6 +201,26 @@ func (s *server) RegisterChain(chainName string, ctx *snow.ConsensusContext, vm
)
}
}

ctx.Lock.Lock()
serviceName, grpcHandler, err := vm.CreateGRPCService(context.TODO())
ctx.Lock.Unlock()
if err != nil {
s.log.Error("failed to create grpc service",
zap.String("chainName", chainName),
zap.Error(err),
)
return
}

if serviceName == "" && grpcHandler == nil {
return
}

grpcHandler = s.wrapMiddleware(chainName, grpcHandler, ctx)
if err := s.grpcRouter.Add(serviceName, grpcHandler); err != nil {
s.log.Error("failed to add route to grpc service", zap.Error(err))
}
}

func (s *server) addChainRoute(chainName string, handler http.Handler, ctx *snow.ConsensusContext, base, endpoint string) error {
Expand All @@ -211,13 +229,17 @@ func (s *server) addChainRoute(chainName string, handler http.Handler, ctx *snow
zap.String("url", url),
zap.String("endpoint", endpoint),
)
handler = s.wrapMiddleware(chainName, handler, ctx)
return s.router.AddRouter(url, endpoint, handler)
}

func (s *server) wrapMiddleware(chainName string, handler http.Handler, ctx *snow.ConsensusContext) http.Handler {
if s.tracingEnabled {
handler = api.TraceHandler(handler, chainName, s.tracer)
}
// Apply middleware to reject calls to the handler before the chain finishes bootstrapping
handler = rejectMiddleware(handler, ctx)
handler = s.metrics.wrapHandler(chainName, handler)
return s.router.AddRouter(url, endpoint, handler)
return s.metrics.wrapHandler(chainName, handler)
}

func (s *server) AddRoute(handler http.Handler, base, endpoint string) error {
Expand Down Expand Up @@ -303,3 +325,19 @@ func (a readPathAdder) AddRoute(handler http.Handler, base, endpoint string) err
func (a readPathAdder) AddAliases(endpoint string, aliases ...string) error {
return a.pather.AddAliasesWithReadLock(endpoint, aliases...)
}

func wrapHandler(handler http.Handler, nodeID ids.NodeID, allowedOrigins []string, allowedHosts []string) http.Handler {
allowedHostsHandler := filterInvalidHosts(handler, allowedHosts)
corsHandler := cors.New(cors.Options{
AllowedOrigins: allowedOrigins,
AllowCredentials: true,
}).Handler(allowedHostsHandler)
gzipHandler := gziphandler.GzipHandler(corsHandler)
return http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
// Attach this node's ID as a header
w.Header().Set("node-id", nodeID.String())
gzipHandler.ServeHTTP(w, r)
},
)
}
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/DataDog/zstd v1.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/antithesishq/antithesis-sdk-go v0.3.8
github.com/ava-labs/coreth v0.13.8-fixed-genesis-upgrade.0.20240815193440-a96bc921e732
github.com/ava-labs/coreth v0.13.8-fixed-genesis-upgrade.0.20240904185842-9f8bae991b35
github.com/ava-labs/ledger-avalanche/go v0.0.0-20240610153809-9c955cc90a95
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593
Expand Down Expand Up @@ -97,7 +97,6 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127 // indirect
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
github.com/frankban/quicktest v1.14.4 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax
github.com/antithesishq/antithesis-sdk-go v0.3.8 h1:OvGoHxIcOXFJLyn9IJQ5DzByZ3YVAWNBc394ObzDRb8=
github.com/antithesishq/antithesis-sdk-go v0.3.8/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.13.8-fixed-genesis-upgrade.0.20240815193440-a96bc921e732 h1:wlhGJbmb7s3bU2QWtxKjscGjfHknQiq+cVhhUjONsB8=
github.com/ava-labs/coreth v0.13.8-fixed-genesis-upgrade.0.20240815193440-a96bc921e732/go.mod h1:RkQLaQ961Xe/sUb3ycn4Qi18vPPuEetTqDf2eDcquAs=
github.com/ava-labs/coreth v0.13.8-fixed-genesis-upgrade.0.20240904185842-9f8bae991b35 h1:/V7dZ7wtl3GyT+dnTtTazmETY0cCvgzj2XAJ2NM/bhI=
github.com/ava-labs/coreth v0.13.8-fixed-genesis-upgrade.0.20240904185842-9f8bae991b35/go.mod h1:8AhBlYab66lSW1c57TX0Gwxuf7/aqqaKUH184kUJFQg=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20240610153809-9c955cc90a95 h1:dOVbtdnZL++pENdTCNZ1nu41eYDQkTML4sWebDnnq8c=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20240610153809-9c955cc90a95/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down Expand Up @@ -191,8 +191,6 @@ github.com/ethereum/go-ethereum v1.13.8 h1:1od+thJel3tM52ZUNQwvpYOeRHlbkVFZ5S8fh
github.com/ethereum/go-ethereum v1.13.8/go.mod h1:sc48XYQxCzH3fG9BcrXCOOgQk2JfZzNAmIKnceogzsA=
github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 h1:FtmdgXiUlNeRsoNMFlKLDt+S+6hbjVMEW6RGQ7aUf7c=
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand Down
12 changes: 7 additions & 5 deletions proto/http/http.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ syntax = "proto3";

package http;

import "google/protobuf/empty.proto";

option go_package = "github.com/ava-labs/avalanchego/proto/pb/http";

service HTTP {
// Handle wraps http1 over http2 and provides support for websockets by implementing
// net conn and responsewriter in http2.
rpc Handle(HTTPRequest) returns (google.protobuf.Empty);
rpc Handle(HTTPRequest) returns (HttpResponse);
// HandleSimple wraps http1 requests over http2 similar to Handle but only passes headers
// and body bytes. Because the request and response are single protos with no inline
// gRPC servers the CPU cost as well as file descriptor overhead is less
Expand Down Expand Up @@ -108,8 +106,6 @@ message Request {
// header contains the request header fields either received
// by the server or to be sent by the client
repeated Element header = 6;
// body is the request payload in bytes
bytes body = 7;
// content_length records the length of the associated content
int64 content_length = 8;
// transfer_encoding lists the transfer encodings from outermost to
Expand Down Expand Up @@ -149,6 +145,12 @@ message HTTPRequest {
Request request = 2;
}

message HttpResponse {
// header is sent to synchronize any header modifications from the plugin
// process
repeated Element header = 1;
Copy link
Contributor Author

@joshua-kim joshua-kim Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grpc requires a header to indicate the end of an rpc, which is directly set onto the map in the plugin process (ref). We need to propagate this update back into avalanchego (I believe current rpcchainvm is not handling plugin-side header updates correctly).

}

message HandleSimpleHTTPRequest {
// method specifies the HTTP method (GET, POST, PUT, etc.)
string method = 1;
Expand Down
17 changes: 17 additions & 0 deletions proto/http/request/request.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
syntax = "proto3";

package http.request;

option go_package = "github.com/ava-labs/avalanchego/proto/pb/http/request";

service Request {
rpc Body(BodyRequest) returns (BodyReply);
}

message BodyRequest {
uint32 n = 1;
}

message BodyReply {
bytes body = 1;
}
Loading