Skip to content

Commit

Permalink
added a test for a load balancer (#1046)
Browse files Browse the repository at this point in the history
* working connection server, prober

* lint
  • Loading branch information
omerlavanet authored Dec 14, 2023
1 parent 24e9203 commit 2938439
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 0 deletions.
3 changes: 3 additions & 0 deletions cmd/lavap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/lavanet/lava/ecosystem/cache"
"github.com/lavanet/lava/protocol/badgegenerator"
"github.com/lavanet/lava/protocol/monitoring"
"github.com/lavanet/lava/protocol/performance/connection"
"github.com/lavanet/lava/protocol/rpcconsumer"
"github.com/lavanet/lava/protocol/rpcprovider"
"github.com/lavanet/lava/protocol/statetracker"
Expand Down Expand Up @@ -53,6 +54,8 @@ func main() {
testCmd.AddCommand(rpcconsumer.CreateTestRPCConsumerCobraCommand())
testCmd.AddCommand(rpcprovider.CreateTestRPCProviderCobraCommand())
testCmd.AddCommand(statetracker.CreateEventsCobraCommand())
testCmd.AddCommand(connection.CreateTestConnectionServerCobraCommand())
testCmd.AddCommand(connection.CreateTestConnectionProbeCobraCommand())
testCmd.AddCommand(monitoring.CreateHealthCobraCommand())
rootCmd.AddCommand(cache.CreateCacheCobraCommand())
if err := svrcmd.Execute(rootCmd, "", app.DefaultNodeHome); err != nil {
Expand Down
33 changes: 33 additions & 0 deletions config/health_examples/health_template_gen.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
max-provider-latency: 150ms
subscription-days-left-alert: 10
interval: 5s
allowed_time_lag: 30s
query-retries: 5
identifier: health_example
cu-percent-threshold: 0.2
alert-suppression-interval: 60s
disable-alert-suppression: false
suppression-alert-count-threshold: 2
metrics-listen-address: ":7776"
disable-alert-logging: false
allow-insecure-provider-dialing: true
consumer_endpoints:
- chain-id: ETH1
api-interface: jsonrpc
network-address: http://127.0.0.1:3333
- chain-id: LAV1
api-interface: rest
network-address: http://127.0.0.1:3360
- chain-id: LAV1
api-interface: tendermintrpc
network-address: http://127.0.0.1:3361
- chain-id: LAV1
api-interface: grpc
network-address: 127.0.0.1:3362
#REPLACED
subscription_addresses:
- lava@1dpr9377jlt0jkcrdetjvp4hlzeypacz2yeehkf
provider_addresses:
- lava@1ue77wfp7nru7kwh6ca2twrs4l4t9wgwndf0h43
- lava@14q33eq0emr95jw5urjcfamylrz3z6qsyjv2wpl
-
162 changes: 162 additions & 0 deletions protocol/performance/connection/connection_cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package connection

import (
"context"
"errors"
"net/http"
"os"
"os/signal"
"time"

"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/lavanet/lava/protocol/chainlib"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
)

const (
intervalFlagName = "interval"
disableTLSFlagName = "disable-tls"
)

func CreateTestConnectionServerCobraCommand() *cobra.Command {
cmdTestConnectionServer := &cobra.Command{
Use: `connection-server [listen_address:port]`,
Short: `test an incoming connection`,
Long: `sets up a grpc server listning to the probe grpc query and logs connection`,
Example: `connection-server 127.0.0.1:3333`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
utils.LavaFormatInfo("Connection-server started")
ctx := context.Background()
listenAddr := args[0]
ctx, cancel := context.WithCancel(ctx)
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
defer func() {
signal.Stop(signalChan)
cancel()
}()

// GRPC
lis := chainlib.GetListenerWithRetryGrpc("tcp", listenAddr)
serverReceiveMaxMessageSize := grpc.MaxRecvMsgSize(1024 * 1024 * 32) // setting receive size to 32mb instead of 4mb default
grpcServer := grpc.NewServer(serverReceiveMaxMessageSize)

wrappedServer := grpcweb.WrapServer(grpcServer)
handler := func(resp http.ResponseWriter, req *http.Request) {
// Set CORS headers
resp.Header().Set("Access-Control-Allow-Origin", "*")
resp.Header().Set("Access-Control-Allow-Headers", "Content-Type, x-grpc-web, lava-sdk-relay-timeout")

wrappedServer.ServeHTTP(resp, req)
}

httpServer := http.Server{
Handler: h2c.NewHandler(http.HandlerFunc(handler), &http2.Server{}),
}

disableTLS := viper.GetBool(disableTLSFlagName)
var serveExecutor func() error
if disableTLS {
utils.LavaFormatWarning("Running with disabled TLS configuration", nil)
serveExecutor = func() error { return httpServer.Serve(lis) }
} else {
NetworkAddressData := lavasession.NetworkAddressData{}
httpServer.TLSConfig = lavasession.GetTlsConfig(NetworkAddressData)
serveExecutor = func() error { return httpServer.ServeTLS(lis, "", "") }
}

guid := utils.GenerateUniqueIdentifier()
utils.LavaFormatInfo("running with unique identifier", utils.LogAttr("guid", guid))
Server := &RelayerConnectionServer{guid: guid}

pairingtypes.RegisterRelayerServer(grpcServer, Server)

go func() {
select {
case <-ctx.Done():
_ = utils.LavaFormatInfo("connection-server ctx.Done")
case <-signalChan:
_ = utils.LavaFormatInfo("connection-server signalChan")
}

shutdownCtx, shutdownRelease := context.WithTimeout(ctx, 10*time.Second)
defer shutdownRelease()

if err := httpServer.Shutdown(shutdownCtx); err != nil {
utils.LavaFormatFatal("connection-server failed to shutdown", err)
}
}()

utils.LavaFormatInfo("connection-server active", utils.Attribute{Key: "address", Value: listenAddr})
if err := serveExecutor(); !errors.Is(err, http.ErrServerClosed) {
utils.LavaFormatFatal("connection-server failed to serve", err, utils.Attribute{Key: "Address", Value: lis.Addr().String()})
}
utils.LavaFormatInfo("connection-server closed server", utils.Attribute{Key: "address", Value: listenAddr})

return nil
},
}
cmdTestConnectionServer.Flags().Bool(disableTLSFlagName, false, "used to disable tls in the server, if false will serve secure grpc with a floating certificate")
return cmdTestConnectionServer
}

func CreateTestConnectionProbeCobraCommand() *cobra.Command {
cmdTestConnectionProbe := &cobra.Command{
Use: `connection-probe [target_address:port]`,
Short: `test an a grpc server by probing`,
Long: `sets up a grpc client probing the provided grpc server`,
Example: `connection-probe 127.0.0.1:3333 --interval 2s`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
utils.LavaFormatInfo("Connection-prober started")
ctx, cancel := context.WithCancel(context.Background())
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
defer func() {
signal.Stop(signalChan)
cancel()
}()
lavasession.AllowInsecureConnectionToProviders = viper.GetBool(lavasession.AllowInsecureConnectionToProvidersFlag)
if lavasession.AllowInsecureConnectionToProviders {
utils.LavaFormatWarning("AllowInsecureConnectionToProviders is set to true, this should be used only in development", nil, utils.Attribute{Key: lavasession.AllowInsecureConnectionToProvidersFlag, Value: lavasession.AllowInsecureConnectionToProviders})
}
address := args[0]
prober := NewProber(address)
utils.LavaFormatInfo("[+] making a run")
err := prober.RunOnce(ctx)
if err != nil {
utils.LavaFormatError("failed a run", err)
}
interval := viper.GetDuration(intervalFlagName)
if interval > 0*time.Second {
ticker := time.NewTicker(interval) // initially every block we check for a polling time
for {
select {
case <-ticker.C:
utils.LavaFormatInfo("[+] making a run")
err = prober.RunOnce(ctx)
utils.LavaFormatError("failed a run", err)
case <-ctx.Done():
utils.LavaFormatInfo("prober ctx.Done")
return nil
case <-signalChan:
utils.LavaFormatInfo("prober signalChan")
return nil
}
}
}
return err
},
}
cmdTestConnectionProbe.Flags().Bool(lavasession.AllowInsecureConnectionToProvidersFlag, false, "allow insecure provider-dialing. used for development and testing without TLS")
cmdTestConnectionProbe.Flags().Duration(intervalFlagName, 0, "the interval duration for the health check, (defaults to 0s) if 0 runs once")
return cmdTestConnectionProbe
}
58 changes: 58 additions & 0 deletions protocol/performance/connection/prober.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package connection

import (
"context"
"fmt"
"math/rand"

"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type Prober struct {
address string
conn *grpc.ClientConn
relayerClient *pairingtypes.RelayerClient
}

func NewProber(addrss string) *Prober {
return &Prober{address: addrss}
}

func createConnection(ctx context.Context, address string) (*pairingtypes.RelayerClient, *grpc.ClientConn, error) {
cswp := lavasession.ConsumerSessionsWithProvider{}
return cswp.ConnectRawClientWithTimeout(ctx, address)
}

func (p *Prober) RunOnce(ctx context.Context) error {
if p.address == "" {
return fmt.Errorf("can't run with address empty")
}
if p.conn == nil || p.relayerClient == nil {
relayer, conn, err := createConnection(ctx, p.address)
if err != nil {
return err
}
p.relayerClient = relayer
p.conn = conn
}
relayerClient := *p.relayerClient
guid := uint64(rand.Int63())

probeReq := &pairingtypes.ProbeRequest{
Guid: guid,
SpecId: "prober",
ApiInterface: "",
}
var trailer metadata.MD
utils.LavaFormatInfo("[+] sending probe", utils.LogAttr("guid", guid))
probeResp, err := relayerClient.Probe(ctx, probeReq, grpc.Trailer(&trailer))
if err != nil {
return err
}
utils.LavaFormatInfo("probe response", utils.LogAttr("guid", probeResp.Guid))
return nil
}
31 changes: 31 additions & 0 deletions protocol/performance/connection/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package connection

import (
"context"
"fmt"

"github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
)

type RelayerConnectionServer struct {
pairingtypes.UnimplementedRelayerServer
guid uint64
}

func (rs *RelayerConnectionServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) {
return nil, fmt.Errorf("unimplemented")
}

func (rs *RelayerConnectionServer) Probe(ctx context.Context, probeReq *pairingtypes.ProbeRequest) (*pairingtypes.ProbeReply, error) {
peerAddress := common.GetIpFromGrpcContext(ctx)
utils.LavaFormatInfo("received probe", utils.LogAttr("incoming-ip", peerAddress))
return &pairingtypes.ProbeReply{
Guid: rs.guid,
}, nil
}

func (rs *RelayerConnectionServer) RelaySubscribe(request *pairingtypes.RelayRequest, srv pairingtypes.Relayer_RelaySubscribeServer) error {
return fmt.Errorf("unimplemented")
}

0 comments on commit 2938439

Please sign in to comment.