diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 8c80b3d799..0b12b6f22e 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -54,7 +54,7 @@ func TestMain(m *testing.M) { func isGrpcServerUp(url string) bool { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50) defer cancel() - conn, err := lavasession.ConnectGRPCClient(context.Background(), url, true, false) + conn, err := lavasession.ConnectGRPCClient(context.Background(), url, true, false, false) if err != nil { return false } diff --git a/protocol/lavasession/common.go b/protocol/lavasession/common.go index 1d03fe9c60..5f8caf3b89 100644 --- a/protocol/lavasession/common.go +++ b/protocol/lavasession/common.go @@ -24,14 +24,15 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/encoding/gzip" ) const ( MaxConsecutiveConnectionAttempts = 5 TimeoutForEstablishingAConnection = 1500 * time.Millisecond // 1.5 seconds MaxSessionsAllowedPerProvider = 1000 // Max number of sessions allowed per provider - MaxAllowedBlockListedSessionPerProvider = 3 - MaximumNumberOfFailuresAllowedPerConsumerSession = 3 + MaxAllowedBlockListedSessionPerProvider = MaxSessionsAllowedPerProvider / 3 + MaximumNumberOfFailuresAllowedPerConsumerSession = 15 RelayNumberIncrement = 1 DataReliabilitySessionId = 0 // data reliability session id is 0. we can change to more sessions later if needed. DataReliabilityRelayNumber = 1 @@ -62,7 +63,7 @@ func IsSessionSyncLoss(err error) bool { return code == codes.Code(SessionOutOfSyncError.ABCICode()) } -func ConnectGRPCClient(ctx context.Context, address string, allowInsecure bool, skipTLS bool) (*grpc.ClientConn, error) { +func ConnectGRPCClient(ctx context.Context, address string, allowInsecure bool, skipTLS bool, allowCompression bool) (*grpc.ClientConn, error) { var opts []grpc.DialOption if skipTLS { @@ -93,6 +94,13 @@ func ConnectGRPCClient(ctx context.Context, address string, allowInsecure bool, })) } + // allow gzip compression for grpc. + if allowCompression { + opts = append(opts, grpc.WithDefaultCallOptions( + grpc.UseCompressor(gzip.Name), // Use gzip compression for provider consumer communication + )) + } + conn, err := grpc.DialContext(ctx, address, opts...) return conn, err } diff --git a/protocol/lavasession/common_test.go b/protocol/lavasession/common_test.go index bf6e09898a..903fd8111c 100644 --- a/protocol/lavasession/common_test.go +++ b/protocol/lavasession/common_test.go @@ -1,11 +1,22 @@ package lavasession import ( + "context" + "crypto/tls" + "fmt" + "log" + "net" "strings" "testing" + "time" + "github.com/lavanet/lava/utils" + pairingtypes "github.com/lavanet/lava/x/pairing/types" planstypes "github.com/lavanet/lava/x/plans/types" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/encoding/gzip" ) type printGeos []*Endpoint @@ -111,3 +122,93 @@ func TestGeoOrdering(t *testing.T) { }) } } + +type RelayerConnectionServer struct { + pairingtypes.UnimplementedRelayerServer + guid uint64 +} + +func (rs *RelayerConnectionServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) { + return nil, fmt.Errorf("unimplemented") +} + +func (rs *RelayerConnectionServer) Probe(ctx context.Context, probeReq *pairingtypes.ProbeRequest) (*pairingtypes.ProbeReply, error) { + // peerAddress := common.GetIpFromGrpcContext(ctx) + // utils.LavaFormatInfo("received probe", utils.LogAttr("incoming-ip", peerAddress)) + return &pairingtypes.ProbeReply{ + Guid: rs.guid, + }, nil +} + +func (rs *RelayerConnectionServer) RelaySubscribe(request *pairingtypes.RelayRequest, srv pairingtypes.Relayer_RelaySubscribeServer) error { + return fmt.Errorf("unimplemented") +} + +func startServer() (*grpc.Server, net.Listener) { + listen := ":0" + lis, err := net.Listen("tcp", listen) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + tlsConfig := GetTlsConfig(NetworkAddressData{}) + srv := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig))) + pairingtypes.RegisterRelayerServer(srv, &RelayerConnectionServer{}) + go func() { + if err := srv.Serve(lis); err != nil { + log.Println("test finished:", err) + } + }() + return srv, lis +} + +// Note that locally testing compression will probably be out performed by non compressed. +// due to the overhead of compressing it. while global communication should benefit from reduced latency. +func BenchmarkGRPCServer(b *testing.B) { + srv, lis := startServer() + address := lis.Addr().String() + defer srv.Stop() + defer lis.Close() + + csp := &ConsumerSessionsWithProvider{} + for { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _, _, err := csp.ConnectRawClientWithTimeout(ctx, address) + if err != nil { + utils.LavaFormatDebug("waiting for grpc server to launch") + continue + } + cancel() + break + } + + runBenchmark := func(b *testing.B, opts ...grpc.DialOption) { + var tlsConf tls.Config + tlsConf.InsecureSkipVerify = true + credentials := credentials.NewTLS(&tlsConf) + opts = append(opts, grpc.WithTransportCredentials(credentials)) + conn, err := grpc.DialContext(context.Background(), address, opts...) + if err != nil { + b.Fatalf("failed to dial server: %v", err) + } + defer conn.Close() + + client := pairingtypes.NewRelayerClient(conn) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + client.Probe(context.Background(), &pairingtypes.ProbeRequest{Guid: 125, SpecId: "EVMOS", ApiInterface: "jsonrpc"}) + } + } + + b.Run("WithoutCompression", func(b *testing.B) { + runBenchmark(b) + }) + + b.Run("WithCompression", func(b *testing.B) { + runBenchmark(b, grpc.WithDefaultCallOptions( + grpc.UseCompressor(gzip.Name), // Use gzip compression for outgoing messages + )) + }) + + time.Sleep(3 * time.Second) +} diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index c80cdc0f2f..2fe8924a3a 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -885,7 +885,7 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu consumerSession.LatestRelayCu = 0 // making sure no one uses it in a wrong way consecutiveErrors := uint64(len(consumerSession.ConsecutiveErrors)) parentConsumerSessionsWithProvider := consumerSession.Parent // must read this pointer before unlocking - csm.updateMetricsManager(consumerSession) + csm.updateMetricsManager(consumerSession, time.Duration(0), false) // finished with consumerSession here can unlock. consumerSession.Free(errorReceived) // we unlock before we change anything in the parent ConsumerSessionsWithProvider @@ -961,13 +961,13 @@ func (csm *ConsumerSessionManager) OnSessionDone( // calculate QoS consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount)) go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock)) - csm.updateMetricsManager(consumerSession) + csm.updateMetricsManager(consumerSession, currentLatency, !isHangingApi) // apply latency only for non hanging apis return nil } // updates QoS metrics for a provider // consumerSession should still be locked when accessing this method as it fetches information from the session it self -func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleConsumerSession) { +func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleConsumerSession, relayLatency time.Duration, sessionSuccessful bool) { if csm.consumerMetricsManager == nil { return } @@ -988,7 +988,7 @@ func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleC publicProviderAddress := consumerSession.Parent.PublicLavaAddress go func() { - csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum) + csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum, relayLatency, sessionSuccessful) // in case we blocked the session add it to our block sessions metric if blockedSession { csm.consumerMetricsManager.AddNumberOfBlockedSessionMetric(chainId, apiInterface, publicProviderAddress) diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index 764003a1e8..7eb124bdb8 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -26,7 +26,7 @@ import ( const ( parallelGoRoutines = 40 numberOfProviders = 10 - numberOfResetsToTest = 10 + numberOfResetsToTest = 1 numberOfAllowedSessionsPerConsumer = 10 firstEpochHeight = 20 secondEpochHeight = 40 @@ -428,6 +428,8 @@ func TestPairingResetWithMultipleFailures(t *testing.T) { ctx := context.Background() csm := CreateConsumerSessionManager() pairingList := createPairingList("", true) + // make list shorter otherwise we wont be able to ban all as it takes slightly more time now + pairingList = map[uint64]*ConsumerSessionsWithProvider{0: pairingList[0]} err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. require.NoError(t, err) @@ -438,6 +440,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) { break } css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session + require.NoError(t, err) for _, cs := range css { err = csm.OnSessionFailure(cs.Session, nil) @@ -811,7 +814,7 @@ func TestContext(t *testing.T) { func TestGrpcClientHang(t *testing.T) { ctx := context.Background() - conn, err := ConnectGRPCClient(ctx, grpcListener, true, false) + conn, err := ConnectGRPCClient(ctx, grpcListener, true, false, false) require.NoError(t, err) client := pairingtypes.NewRelayerClient(conn) err = conn.Close() diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index 8b0d64814c..911fbb6b3b 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -38,9 +38,15 @@ func (list EndpointInfoList) Swap(i, j int) { list[i], list[j] = list[j], list[i] } -const AllowInsecureConnectionToProvidersFlag = "allow-insecure-provider-dialing" +const ( + AllowInsecureConnectionToProvidersFlag = "allow-insecure-provider-dialing" + AllowGRPCCompressionFlag = "allow-grpc-compression-for-consumer-provider-communication" +) -var AllowInsecureConnectionToProviders = false +var ( + AllowInsecureConnectionToProviders = false + AllowGRPCCompressionForConsumerProviderCommunication = false +) type UsedProvidersInf interface { RemoveUsed(providerAddress string, err error) @@ -301,7 +307,7 @@ func (cswp *ConsumerSessionsWithProvider) decreaseUsedComputeUnits(cu uint64) er func (cswp *ConsumerSessionsWithProvider) ConnectRawClientWithTimeout(ctx context.Context, addr string) (*pairingtypes.RelayerClient, *grpc.ClientConn, error) { connectCtx, cancel := context.WithTimeout(ctx, TimeoutForEstablishingAConnection) defer cancel() - conn, err := ConnectGRPCClient(connectCtx, addr, AllowInsecureConnectionToProviders, false) + conn, err := ConnectGRPCClient(connectCtx, addr, AllowInsecureConnectionToProviders, false, AllowGRPCCompressionForConsumerProviderCommunication) if err != nil { return nil, nil, err } diff --git a/protocol/metrics/metrics_consumer_manager.go b/protocol/metrics/metrics_consumer_manager.go index 86195ca878..1d3816b9b2 100644 --- a/protocol/metrics/metrics_consumer_manager.go +++ b/protocol/metrics/metrics_consumer_manager.go @@ -5,6 +5,7 @@ import ( "net/http" "sync" "sync/atomic" + "time" "github.com/lavanet/lava/utils" pairingtypes "github.com/lavanet/lava/x/pairing/types" @@ -12,6 +13,18 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) +type LatencyTracker struct { + AverageLatency time.Duration // in nano seconds (time.Since result) + TotalRequests int +} + +func (lt *LatencyTracker) AddLatency(latency time.Duration) { + lt.TotalRequests++ + weight := 1.0 / float64(lt.TotalRequests) + // Calculate the weighted average of the current average latency and the new latency + lt.AverageLatency = time.Duration(float64(lt.AverageLatency)*(1-weight) + float64(latency)*weight) +} + type ConsumerMetricsManager struct { totalCURequestedMetric *prometheus.CounterVec totalRelaysRequestedMetric *prometheus.CounterVec @@ -35,6 +48,8 @@ type ConsumerMetricsManager struct { protocolVersionMetric *prometheus.GaugeVec providerRelays map[string]uint64 addMethodsApiGauge bool + averageLatencyPerChain map[string]*LatencyTracker // key == chain Id + api interface + averageLatencyMetric *prometheus.GaugeVec } type ConsumerMetricsManagerOptions struct { @@ -128,9 +143,13 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM endpointsHealthChecksOkMetric.Set(1) protocolVersionMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "lava_provider_protocol_version", + Name: "lava_consumer_protocol_version", Help: "The current running lavap version for the process. major := version / 1000000, minor := (version / 1000) % 1000, patch := version % 1000", }, []string{"version"}) + averageLatencyMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_average_latency_in_milliseconds", + Help: "average latency per chain id per api interface", + }, []string{"spec", "apiInterface"}) // Register the metrics with the Prometheus registry. prometheus.MustRegister(totalCURequestedMetric) prometheus.MustRegister(totalRelaysRequestedMetric) @@ -151,6 +170,7 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM prometheus.MustRegister(currentNumberOfOpenSessionsMetric) prometheus.MustRegister(currentNumberOfBlockedSessionsMetric) prometheus.MustRegister(apiSpecificsMetric) + prometheus.MustRegister(averageLatencyMetric) consumerMetricsManager := &ConsumerMetricsManager{ totalCURequestedMetric: totalCURequestedMetric, @@ -163,10 +183,12 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM LatestBlockMetric: latestBlockMetric, LatestProviderRelay: latestProviderRelay, providerRelays: map[string]uint64{}, + averageLatencyPerChain: map[string]*LatencyTracker{}, virtualEpochMetric: virtualEpochMetric, endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric, endpointsHealthChecksOk: 1, protocolVersionMetric: protocolVersionMetric, + averageLatencyMetric: averageLatencyMetric, totalRelaysSentToProvidersMetric: totalRelaysSentToProvidersMetric, totalRelaysReturnedFromProvidersMetric: totalRelaysReturnedFromProvidersMetric, totalRelaysSentByNewBatchTickerMetric: totalRelaysSentByNewBatchTickerMetric, @@ -273,7 +295,11 @@ func (pme *ConsumerMetricsManager) AddNumberOfBlockedSessionMetric(chainId strin pme.currentNumberOfBlockedSessionsMetric.WithLabelValues(chainId, apiInterface, provider).Inc() } -func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface string, providerAddress string, qos *pairingtypes.QualityOfServiceReport, qosExcellence *pairingtypes.QualityOfServiceReport, latestBlock int64, relays uint64) { +func (pme *ConsumerMetricsManager) getKeyForAverageLatency(chainId string, apiInterface string) string { + return chainId + apiInterface +} + +func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface string, providerAddress string, qos *pairingtypes.QualityOfServiceReport, qosExcellence *pairingtypes.QualityOfServiceReport, latestBlock int64, relays uint64, relayLatency time.Duration, sessionSuccessful bool) { if pme == nil { return } @@ -285,6 +311,19 @@ func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface st // do not add Qos metrics there's another session with more statistics return } + + // calculate average latency on successful sessions only and not hanging apis (transactions etc..) + if sessionSuccessful { + averageLatencyKey := pme.getKeyForAverageLatency(chainId, apiInterface) + existingLatency, foundExistingLatency := pme.averageLatencyPerChain[averageLatencyKey] + if !foundExistingLatency { + pme.averageLatencyPerChain[averageLatencyKey] = &LatencyTracker{} + existingLatency = pme.averageLatencyPerChain[averageLatencyKey] + } + existingLatency.AddLatency(relayLatency) + pme.averageLatencyMetric.WithLabelValues(chainId, apiInterface).Set(float64(existingLatency.AverageLatency.Milliseconds())) + } + pme.LatestProviderRelay.WithLabelValues(chainId, providerAddress, apiInterface).SetToCurrentTime() // update existing relays pme.providerRelays[providerRelaysKey] = relays diff --git a/protocol/metrics/metrics_consumer_manager_test.go b/protocol/metrics/metrics_consumer_manager_test.go new file mode 100644 index 0000000000..1fb0f44f13 --- /dev/null +++ b/protocol/metrics/metrics_consumer_manager_test.go @@ -0,0 +1,68 @@ +package metrics + +import ( + "fmt" + "testing" + "time" +) + +func TestAddLatency(t *testing.T) { + lt := LatencyTracker{} + + // Add some latencies + latencies := []time.Duration{time.Millisecond * 100, time.Millisecond * 200, time.Millisecond * 300} + expectedAverageLatencies := []time.Duration{time.Millisecond * 100, time.Millisecond * 150, time.Millisecond * 200} + + for i, latency := range latencies { + lt.AddLatency(latency) + fmt.Printf("Average Latency after adding %v: %v\n", latency, lt.AverageLatency) + if lt.AverageLatency != expectedAverageLatencies[i] { + t.Errorf("Expected average latency %v, got %v", expectedAverageLatencies[i], lt.AverageLatency) + } + } + + // Test zero TotalRequests + lt2 := LatencyTracker{} + if lt2.AverageLatency != 0 { + t.Errorf("Expected average latency 0, got %v", lt2.AverageLatency) + } +} + +func TestAddLatencyNanoSeconds(t *testing.T) { + lt := LatencyTracker{} + + // Add some latencies + latencies := []time.Duration{} + timeNow := time.Now() + time.Sleep(101 * time.Millisecond) + latencies = append(latencies, time.Since(timeNow)) + time.Sleep(101 * time.Millisecond) + latencies = append(latencies, time.Since(timeNow)) + time.Sleep(101 * time.Millisecond) + latencies = append(latencies, time.Since(timeNow)) + + for i, latency := range latencies { + lt.AddLatency(latency) + fmt.Printf("Average Latency after adding %v: %v\n", latency, lt.AverageLatency) + if lt.AverageLatency.Milliseconds() < int64(100*i) { + t.Errorf("Expected average latency %v < %v", int64(100*i), lt.AverageLatency) + } + } + + // Test zero TotalRequests + lt2 := LatencyTracker{} + if lt2.AverageLatency != 0 { + t.Errorf("Expected average latency 0, got %v", lt2.AverageLatency) + } +} + +func TestAddLatencyWithZeroTotalRequests(t *testing.T) { + lt := LatencyTracker{} + + // Adding latency without incrementing TotalRequests + lt.AddLatency(time.Millisecond * 100) + + if lt.AverageLatency != time.Millisecond*100 { + t.Errorf("Expected average latency %v, got %v", time.Millisecond*100, lt.AverageLatency) + } +} diff --git a/protocol/performance/cache.go b/protocol/performance/cache.go index a5a46e76ab..fcb6adb6fa 100644 --- a/protocol/performance/cache.go +++ b/protocol/performance/cache.go @@ -17,7 +17,7 @@ func ConnectGRPCConnectionToRelayerCacheService(ctx context.Context, addr string connectCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() - conn, err := lavasession.ConnectGRPCClient(connectCtx, addr, false, true) + conn, err := lavasession.ConnectGRPCClient(connectCtx, addr, false, true, false) if err != nil { return nil, err } diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index ece61ef9c2..976fe6b13b 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -399,6 +399,10 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 if lavasession.AllowInsecureConnectionToProviders { utils.LavaFormatWarning("AllowInsecureConnectionToProviders is set to true, this should be used only in development", nil, utils.Attribute{Key: lavasession.AllowInsecureConnectionToProvidersFlag, Value: lavasession.AllowInsecureConnectionToProviders}) } + lavasession.AllowGRPCCompressionForConsumerProviderCommunication = viper.GetBool(lavasession.AllowGRPCCompressionFlag) + if lavasession.AllowGRPCCompressionForConsumerProviderCommunication { + utils.LavaFormatInfo("AllowGRPCCompressionForConsumerProviderCommunication is set to true, messages will be compressed", utils.Attribute{Key: lavasession.AllowGRPCCompressionFlag, Value: lavasession.AllowGRPCCompressionForConsumerProviderCommunication}) + } var rpcEndpoints []*lavasession.RPCEndpoint var viper_endpoints *viper.Viper @@ -545,6 +549,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.MarkFlagRequired(common.GeolocationFlag) cmdRPCConsumer.Flags().Bool("secure", false, "secure sends reliability on every message") cmdRPCConsumer.Flags().Bool(lavasession.AllowInsecureConnectionToProvidersFlag, false, "allow insecure provider-dialing. used for development and testing") + cmdRPCConsumer.Flags().Bool(lavasession.AllowGRPCCompressionFlag, false, "allow messages to be compressed when communicating between the consumer and provider") cmdRPCConsumer.Flags().Bool(common.TestModeFlagName, false, "test mode causes rpcconsumer to send dummy data and print all of the metadata in it's listeners") cmdRPCConsumer.Flags().String(performance.PprofAddressFlagName, "", "pprof server address, used for code profiling") cmdRPCConsumer.Flags().String(performance.CacheFlagName, "", "address for a cache server to improve performance") diff --git a/protocol/rpcprovider/provider_listener.go b/protocol/rpcprovider/provider_listener.go index f5bbd0e8c7..0817c2a3af 100644 --- a/protocol/rpcprovider/provider_listener.go +++ b/protocol/rpcprovider/provider_listener.go @@ -61,8 +61,10 @@ func NewProviderListener(ctx context.Context, networkAddress lavasession.Network // GRPC lis := chainlib.GetListenerWithRetryGrpc("tcp", networkAddress.Address) - serverReceiveMaxMessageSize := grpc.MaxRecvMsgSize(1024 * 1024 * 32) // setting receive size to 32mb instead of 4mb default - grpcServer := grpc.NewServer(serverReceiveMaxMessageSize) + opts := []grpc.ServerOption{ + grpc.MaxRecvMsgSize(1024 * 1024 * 32), // setting receive size to 32mb instead of 4mb default + } + grpcServer := grpc.NewServer(opts...) wrappedServer := grpcweb.WrapServer(grpcServer) handler := func(resp http.ResponseWriter, req *http.Request) { diff --git a/protocol/rpcprovider/rpcprovider_server_test.go b/protocol/rpcprovider/rpcprovider_server_test.go index 97131e2654..092c46339c 100644 --- a/protocol/rpcprovider/rpcprovider_server_test.go +++ b/protocol/rpcprovider/rpcprovider_server_test.go @@ -253,7 +253,7 @@ func TestHandleConsistency(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), play.timeout) latestBlock, _, timeSlept, err := rpcproviderServer.handleConsistency(ctx, play.timeout, seenBlock, requestBlock, averageBlockTime, blockLagForQosSync, blocksInFinalizationData, blockDistanceToFinalization) cancel() - require.Equal(t, play.err == nil, err == nil, err, strconv.Itoa(calls)) + require.Equal(t, play.err == nil, err == nil, strconv.Itoa(calls)) require.Less(t, timeSlept, play.timeout) if play.sleep { require.NotZero(t, timeSlept)