Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT-add-grpc-compression-for-provider-consumer-communication #1440

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMain(m *testing.M) {
func isGrpcServerUp(url string) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
conn, err := lavasession.ConnectGRPCClient(context.Background(), url, true, false)
conn, err := lavasession.ConnectGRPCClient(context.Background(), url, true, false, false)
if err != nil {
return false
}
Expand Down
14 changes: 11 additions & 3 deletions protocol/lavasession/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
)

const (
MaxConsecutiveConnectionAttempts = 5
TimeoutForEstablishingAConnection = 1500 * time.Millisecond // 1.5 seconds
MaxSessionsAllowedPerProvider = 1000 // Max number of sessions allowed per provider
MaxAllowedBlockListedSessionPerProvider = 3
MaximumNumberOfFailuresAllowedPerConsumerSession = 3
MaxAllowedBlockListedSessionPerProvider = MaxSessionsAllowedPerProvider / 3
MaximumNumberOfFailuresAllowedPerConsumerSession = 15
RelayNumberIncrement = 1
DataReliabilitySessionId = 0 // data reliability session id is 0. we can change to more sessions later if needed.
DataReliabilityRelayNumber = 1
Expand Down Expand Up @@ -62,7 +63,7 @@ func IsSessionSyncLoss(err error) bool {
return code == codes.Code(SessionOutOfSyncError.ABCICode())
}

func ConnectGRPCClient(ctx context.Context, address string, allowInsecure bool, skipTLS bool) (*grpc.ClientConn, error) {
func ConnectGRPCClient(ctx context.Context, address string, allowInsecure bool, skipTLS bool, allowCompression bool) (*grpc.ClientConn, error) {
var opts []grpc.DialOption

if skipTLS {
Expand Down Expand Up @@ -93,6 +94,13 @@ func ConnectGRPCClient(ctx context.Context, address string, allowInsecure bool,
}))
}

// allow gzip compression for grpc.
if allowCompression {
opts = append(opts, grpc.WithDefaultCallOptions(
grpc.UseCompressor(gzip.Name), // Use gzip compression for provider consumer communication
))
}

conn, err := grpc.DialContext(ctx, address, opts...)
return conn, err
}
Expand Down
101 changes: 101 additions & 0 deletions protocol/lavasession/common_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
package lavasession

import (
"context"
"crypto/tls"
"fmt"
"log"
"net"
"strings"
"testing"
"time"

"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
planstypes "github.com/lavanet/lava/x/plans/types"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
)

type printGeos []*Endpoint
Expand Down Expand Up @@ -111,3 +122,93 @@ func TestGeoOrdering(t *testing.T) {
})
}
}

type RelayerConnectionServer struct {
pairingtypes.UnimplementedRelayerServer
guid uint64
}

func (rs *RelayerConnectionServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) {
return nil, fmt.Errorf("unimplemented")
}

func (rs *RelayerConnectionServer) Probe(ctx context.Context, probeReq *pairingtypes.ProbeRequest) (*pairingtypes.ProbeReply, error) {
// peerAddress := common.GetIpFromGrpcContext(ctx)
// utils.LavaFormatInfo("received probe", utils.LogAttr("incoming-ip", peerAddress))
return &pairingtypes.ProbeReply{
Guid: rs.guid,
}, nil
}

func (rs *RelayerConnectionServer) RelaySubscribe(request *pairingtypes.RelayRequest, srv pairingtypes.Relayer_RelaySubscribeServer) error {
return fmt.Errorf("unimplemented")
}

func startServer() (*grpc.Server, net.Listener) {
listen := ":0"
lis, err := net.Listen("tcp", listen)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
tlsConfig := GetTlsConfig(NetworkAddressData{})
srv := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))
pairingtypes.RegisterRelayerServer(srv, &RelayerConnectionServer{})
go func() {
if err := srv.Serve(lis); err != nil {
log.Println("test finished:", err)
}
}()
return srv, lis
}

// Note that locally testing compression will probably be out performed by non compressed.
// due to the overhead of compressing it. while global communication should benefit from reduced latency.
func BenchmarkGRPCServer(b *testing.B) {
srv, lis := startServer()
address := lis.Addr().String()
defer srv.Stop()
defer lis.Close()

csp := &ConsumerSessionsWithProvider{}
for {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, _, err := csp.ConnectRawClientWithTimeout(ctx, address)
if err != nil {
utils.LavaFormatDebug("waiting for grpc server to launch")
continue
}
cancel()
break
}

runBenchmark := func(b *testing.B, opts ...grpc.DialOption) {
var tlsConf tls.Config
tlsConf.InsecureSkipVerify = true
credentials := credentials.NewTLS(&tlsConf)
opts = append(opts, grpc.WithTransportCredentials(credentials))
conn, err := grpc.DialContext(context.Background(), address, opts...)
if err != nil {
b.Fatalf("failed to dial server: %v", err)
}
defer conn.Close()

client := pairingtypes.NewRelayerClient(conn)

b.ResetTimer()
for i := 0; i < b.N; i++ {
client.Probe(context.Background(), &pairingtypes.ProbeRequest{Guid: 125, SpecId: "EVMOS", ApiInterface: "jsonrpc"})
}
}

b.Run("WithoutCompression", func(b *testing.B) {
runBenchmark(b)
})

b.Run("WithCompression", func(b *testing.B) {
runBenchmark(b, grpc.WithDefaultCallOptions(
grpc.UseCompressor(gzip.Name), // Use gzip compression for outgoing messages
))
})

time.Sleep(3 * time.Second)
}
8 changes: 4 additions & 4 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu
consumerSession.LatestRelayCu = 0 // making sure no one uses it in a wrong way
consecutiveErrors := uint64(len(consumerSession.ConsecutiveErrors))
parentConsumerSessionsWithProvider := consumerSession.Parent // must read this pointer before unlocking
csm.updateMetricsManager(consumerSession)
csm.updateMetricsManager(consumerSession, time.Duration(0), false)
// finished with consumerSession here can unlock.
consumerSession.Free(errorReceived) // we unlock before we change anything in the parent ConsumerSessionsWithProvider

Expand Down Expand Up @@ -961,13 +961,13 @@ func (csm *ConsumerSessionManager) OnSessionDone(
// calculate QoS
consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount))
go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock))
csm.updateMetricsManager(consumerSession)
csm.updateMetricsManager(consumerSession, currentLatency, !isHangingApi) // apply latency only for non hanging apis
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// updates QoS metrics for a provider
// consumerSession should still be locked when accessing this method as it fetches information from the session it self
func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleConsumerSession) {
func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleConsumerSession, relayLatency time.Duration, sessionSuccessful bool) {
if csm.consumerMetricsManager == nil {
return
}
Expand All @@ -988,7 +988,7 @@ func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleC
publicProviderAddress := consumerSession.Parent.PublicLavaAddress

go func() {
csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum)
csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum, relayLatency, sessionSuccessful)
// in case we blocked the session add it to our block sessions metric
if blockedSession {
csm.consumerMetricsManager.AddNumberOfBlockedSessionMetric(chainId, apiInterface, publicProviderAddress)
Expand Down
7 changes: 5 additions & 2 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
const (
parallelGoRoutines = 40
numberOfProviders = 10
numberOfResetsToTest = 10
numberOfResetsToTest = 1
numberOfAllowedSessionsPerConsumer = 10
firstEpochHeight = 20
secondEpochHeight = 40
Expand Down Expand Up @@ -428,6 +428,8 @@ func TestPairingResetWithMultipleFailures(t *testing.T) {
ctx := context.Background()
csm := CreateConsumerSessionManager()
pairingList := createPairingList("", true)
// make list shorter otherwise we wont be able to ban all as it takes slightly more time now
pairingList = map[uint64]*ConsumerSessionsWithProvider{0: pairingList[0]}
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)

Expand All @@ -438,6 +440,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) {
break
}
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css {
err = csm.OnSessionFailure(cs.Session, nil)
Expand Down Expand Up @@ -811,7 +814,7 @@ func TestContext(t *testing.T) {

func TestGrpcClientHang(t *testing.T) {
ctx := context.Background()
conn, err := ConnectGRPCClient(ctx, grpcListener, true, false)
conn, err := ConnectGRPCClient(ctx, grpcListener, true, false, false)
require.NoError(t, err)
client := pairingtypes.NewRelayerClient(conn)
err = conn.Close()
Expand Down
12 changes: 9 additions & 3 deletions protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@ func (list EndpointInfoList) Swap(i, j int) {
list[i], list[j] = list[j], list[i]
}

const AllowInsecureConnectionToProvidersFlag = "allow-insecure-provider-dialing"
const (
AllowInsecureConnectionToProvidersFlag = "allow-insecure-provider-dialing"
AllowGRPCCompressionFlag = "allow-grpc-compression-for-consumer-provider-communication"
)

var AllowInsecureConnectionToProviders = false
var (
AllowInsecureConnectionToProviders = false
AllowGRPCCompressionForConsumerProviderCommunication = false
)

type UsedProvidersInf interface {
RemoveUsed(providerAddress string, err error)
Expand Down Expand Up @@ -301,7 +307,7 @@ func (cswp *ConsumerSessionsWithProvider) decreaseUsedComputeUnits(cu uint64) er
func (cswp *ConsumerSessionsWithProvider) ConnectRawClientWithTimeout(ctx context.Context, addr string) (*pairingtypes.RelayerClient, *grpc.ClientConn, error) {
connectCtx, cancel := context.WithTimeout(ctx, TimeoutForEstablishingAConnection)
defer cancel()
conn, err := ConnectGRPCClient(connectCtx, addr, AllowInsecureConnectionToProviders, false)
conn, err := ConnectGRPCClient(connectCtx, addr, AllowInsecureConnectionToProviders, false, AllowGRPCCompressionForConsumerProviderCommunication)
if err != nil {
return nil, nil, err
}
Expand Down
43 changes: 41 additions & 2 deletions protocol/metrics/metrics_consumer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,26 @@ import (
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type LatencyTracker struct {
AverageLatency time.Duration // in nano seconds (time.Since result)
TotalRequests int
}

func (lt *LatencyTracker) AddLatency(latency time.Duration) {
lt.TotalRequests++
weight := 1.0 / float64(lt.TotalRequests)
// Calculate the weighted average of the current average latency and the new latency
lt.AverageLatency = time.Duration(float64(lt.AverageLatency)*(1-weight) + float64(latency)*weight)
}

type ConsumerMetricsManager struct {
totalCURequestedMetric *prometheus.CounterVec
totalRelaysRequestedMetric *prometheus.CounterVec
Expand All @@ -35,6 +48,8 @@ type ConsumerMetricsManager struct {
protocolVersionMetric *prometheus.GaugeVec
providerRelays map[string]uint64
addMethodsApiGauge bool
averageLatencyPerChain map[string]*LatencyTracker // key == chain Id + api interface
averageLatencyMetric *prometheus.GaugeVec
}

type ConsumerMetricsManagerOptions struct {
Expand Down Expand Up @@ -128,9 +143,13 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM

endpointsHealthChecksOkMetric.Set(1)
protocolVersionMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_protocol_version",
Name: "lava_consumer_protocol_version",
Help: "The current running lavap version for the process. major := version / 1000000, minor := (version / 1000) % 1000, patch := version % 1000",
}, []string{"version"})
averageLatencyMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_average_latency_in_milliseconds",
Help: "average latency per chain id per api interface",
}, []string{"spec", "apiInterface"})
// Register the metrics with the Prometheus registry.
prometheus.MustRegister(totalCURequestedMetric)
prometheus.MustRegister(totalRelaysRequestedMetric)
Expand All @@ -151,6 +170,7 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
prometheus.MustRegister(currentNumberOfOpenSessionsMetric)
prometheus.MustRegister(currentNumberOfBlockedSessionsMetric)
prometheus.MustRegister(apiSpecificsMetric)
prometheus.MustRegister(averageLatencyMetric)

consumerMetricsManager := &ConsumerMetricsManager{
totalCURequestedMetric: totalCURequestedMetric,
Expand All @@ -163,10 +183,12 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
LatestBlockMetric: latestBlockMetric,
LatestProviderRelay: latestProviderRelay,
providerRelays: map[string]uint64{},
averageLatencyPerChain: map[string]*LatencyTracker{},
virtualEpochMetric: virtualEpochMetric,
endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric,
endpointsHealthChecksOk: 1,
protocolVersionMetric: protocolVersionMetric,
averageLatencyMetric: averageLatencyMetric,
totalRelaysSentToProvidersMetric: totalRelaysSentToProvidersMetric,
totalRelaysReturnedFromProvidersMetric: totalRelaysReturnedFromProvidersMetric,
totalRelaysSentByNewBatchTickerMetric: totalRelaysSentByNewBatchTickerMetric,
Expand Down Expand Up @@ -273,7 +295,11 @@ func (pme *ConsumerMetricsManager) AddNumberOfBlockedSessionMetric(chainId strin
pme.currentNumberOfBlockedSessionsMetric.WithLabelValues(chainId, apiInterface, provider).Inc()
}

func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface string, providerAddress string, qos *pairingtypes.QualityOfServiceReport, qosExcellence *pairingtypes.QualityOfServiceReport, latestBlock int64, relays uint64) {
func (pme *ConsumerMetricsManager) getKeyForAverageLatency(chainId string, apiInterface string) string {
return chainId + apiInterface
}

func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface string, providerAddress string, qos *pairingtypes.QualityOfServiceReport, qosExcellence *pairingtypes.QualityOfServiceReport, latestBlock int64, relays uint64, relayLatency time.Duration, sessionSuccessful bool) {
if pme == nil {
return
}
Expand All @@ -285,6 +311,19 @@ func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface st
// do not add Qos metrics there's another session with more statistics
return
}

// calculate average latency on successful sessions only and not hanging apis (transactions etc..)
if sessionSuccessful {
averageLatencyKey := pme.getKeyForAverageLatency(chainId, apiInterface)
existingLatency, foundExistingLatency := pme.averageLatencyPerChain[averageLatencyKey]
if !foundExistingLatency {
pme.averageLatencyPerChain[averageLatencyKey] = &LatencyTracker{}
existingLatency = pme.averageLatencyPerChain[averageLatencyKey]
}
existingLatency.AddLatency(relayLatency)
pme.averageLatencyMetric.WithLabelValues(chainId, apiInterface).Set(float64(existingLatency.AverageLatency.Milliseconds()))
}

pme.LatestProviderRelay.WithLabelValues(chainId, providerAddress, apiInterface).SetToCurrentTime()
// update existing relays
pme.providerRelays[providerRelaysKey] = relays
Expand Down
Loading
Loading