From 41b3b3a01b7ac11a4daaf845468342c6bcb15889 Mon Sep 17 00:00:00 2001 From: Ran Mishael <106548467+ranlavanet@users.noreply.github.com> Date: Thu, 2 May 2024 13:42:00 +0200 Subject: [PATCH] feat: PRT-get-best-endpoint-csm (#1396) * finally fetching all endpoints and also sorting them by best latency so later we get best endpoint first * adding sort mechanism * changing func name * changing func name * adding testServer that responds to Probing, with a builtin Delay. also adding another test to test the Endpoint sorting mechanism. * renaming lint * improving swap skills * I wonder if anyone will ever see this commit message. * changing error for not implemented * adding error protection --- protocol/integration/protocol_test.go | 2 +- protocol/lavasession/common.go | 2 +- .../lavasession/consumer_session_manager.go | 101 ++++++---- .../consumer_session_manager_test.go | 175 +++++++++++++----- protocol/lavasession/consumer_types.go | 76 +++++++- 5 files changed, 261 insertions(+), 95 deletions(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index cfccbce52f..580af7e6a2 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -54,7 +54,7 @@ func TestMain(m *testing.M) { func isGrpcServerUp(url string) bool { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50) defer cancel() - conn, err := lavasession.ConnectgRPCClient(context.Background(), url, true) + conn, err := lavasession.ConnectGRPCClient(context.Background(), url, true) if err != nil { return false } diff --git a/protocol/lavasession/common.go b/protocol/lavasession/common.go index add8cdcccb..90794fe5ee 100644 --- a/protocol/lavasession/common.go +++ b/protocol/lavasession/common.go @@ -58,7 +58,7 @@ func IsSessionSyncLoss(err error) bool { return code == codes.Code(SessionOutOfSyncError.ABCICode()) } -func ConnectgRPCClient(ctx context.Context, address string, allowInsecure bool) (*grpc.ClientConn, error) { +func ConnectGRPCClient(ctx context.Context, address string, allowInsecure bool) (*grpc.ClientConn, error) { var tlsConf tls.Config if allowInsecure { tlsConf.InsecureSkipVerify = true // this will allow us to use self signed certificates in development. diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index ee6600ed48..eefc086a0b 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -2,6 +2,7 @@ package lavasession import ( "context" + "fmt" "sort" "strings" "sync" @@ -196,8 +197,7 @@ func (csm *ConsumerSessionManager) probeProviders(ctx context.Context, pairingLi // this code needs to be thread safe func (csm *ConsumerSessionManager) probeProvider(ctx context.Context, consumerSessionsWithProvider *ConsumerSessionsWithProvider, epoch uint64, tryReconnectToDisabledEndpoints bool) (latency time.Duration, providerAddress string, err error) { - // TODO: fetch all endpoints not just one - connected, endpoint, providerAddress, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, tryReconnectToDisabledEndpoints) + connected, endpoints, providerAddress, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, tryReconnectToDisabledEndpoints, true) if err != nil || !connected { if AllProviderEndpointsDisabledError.Is(err) { csm.blockProvider(providerAddress, true, epoch, MaxConsecutiveConnectionAttempts, 0, csm.GenerateReconnectCallback(consumerSessionsWithProvider)) // reporting and blocking provider this epoch @@ -205,43 +205,65 @@ func (csm *ConsumerSessionManager) probeProvider(ctx context.Context, consumerSe return 0, providerAddress, err } - relaySentTime := time.Now() - connectCtx, cancel := context.WithTimeout(ctx, common.AverageWorldLatency) - defer cancel() - guid, found := utils.GetUniqueIdentifier(connectCtx) - if !found { - return 0, providerAddress, utils.LavaFormatError("probeProvider failed fetching unique identifier from context when it's set", nil) - } - if endpoint.Client == nil { - consumerSessionsWithProvider.Lock.Lock() - defer consumerSessionsWithProvider.Lock.Unlock() - return 0, providerAddress, utils.LavaFormatError("returned nil client in endpoint", nil, utils.Attribute{Key: "consumerSessionWithProvider", Value: consumerSessionsWithProvider}) - } - client := *endpoint.Client - probeReq := &pairingtypes.ProbeRequest{ - Guid: guid, - SpecId: csm.rpcEndpoint.ChainID, - ApiInterface: csm.rpcEndpoint.ApiInterface, - } - var trailer metadata.MD - probeResp, err := client.Probe(connectCtx, probeReq, grpc.Trailer(&trailer)) - versions := trailer.Get(common.VersionMetadataKey) - relayLatency := time.Since(relaySentTime) - if err != nil { - return 0, providerAddress, utils.LavaFormatError("probe call error", err, utils.Attribute{Key: "provider", Value: providerAddress}) - } - providerGuid := probeResp.GetGuid() - if providerGuid != guid { - return 0, providerAddress, utils.LavaFormatWarning("mismatch probe response", nil, utils.Attribute{Key: "provider", Value: providerAddress}, utils.Attribute{Key: "provider Guid", Value: providerGuid}, utils.Attribute{Key: "sent guid", Value: guid}) - } - if probeResp.LatestBlock == 0 { - return 0, providerAddress, utils.LavaFormatWarning("provider returned 0 latest block", nil, utils.Attribute{Key: "provider", Value: providerAddress}, utils.Attribute{Key: "sent guid", Value: guid}) + var endpointInfos []EndpointInfo + lastError := fmt.Errorf("endpoints list is empty") // this error will happen if we had 0 endpoints + for _, endpoint := range endpoints { + err := func() error { + connectCtx, cancel := context.WithTimeout(ctx, common.AverageWorldLatency) + defer cancel() + guid, found := utils.GetUniqueIdentifier(connectCtx) + if !found { + return utils.LavaFormatError("probeProvider failed fetching unique identifier from context when it's set", nil) + } + if endpoint.Client == nil { + consumerSessionsWithProvider.Lock.Lock() + defer consumerSessionsWithProvider.Lock.Unlock() + return utils.LavaFormatError("returned nil client in endpoint", nil, utils.Attribute{Key: "consumerSessionWithProvider", Value: consumerSessionsWithProvider}) + } + client := *endpoint.Client + probeReq := &pairingtypes.ProbeRequest{ + Guid: guid, + SpecId: csm.rpcEndpoint.ChainID, + ApiInterface: csm.rpcEndpoint.ApiInterface, + } + var trailer metadata.MD + relaySentTime := time.Now() + probeResp, err := client.Probe(connectCtx, probeReq, grpc.Trailer(&trailer)) + relayLatency := time.Since(relaySentTime) + versions := trailer.Get(common.VersionMetadataKey) + if err != nil { + return utils.LavaFormatError("probe call error", err, utils.Attribute{Key: "provider", Value: providerAddress}) + } + providerGuid := probeResp.GetGuid() + if providerGuid != guid { + return utils.LavaFormatWarning("mismatch probe response", nil, utils.Attribute{Key: "provider", Value: providerAddress}, utils.Attribute{Key: "provider Guid", Value: providerGuid}, utils.Attribute{Key: "sent guid", Value: guid}) + } + if probeResp.LatestBlock == 0 { + return utils.LavaFormatWarning("provider returned 0 latest block", nil, utils.Attribute{Key: "provider", Value: providerAddress}, utils.Attribute{Key: "sent guid", Value: guid}) + } + + endpointInfos = append(endpointInfos, EndpointInfo{ + Latency: relayLatency, + Endpoint: endpoint, + }) + // public lava address is a value that is not changing, so it's thread safe + if DebugProbes { + utils.LavaFormatDebug("Probed provider successfully", utils.Attribute{Key: "latency", Value: relayLatency}, utils.Attribute{Key: "provider", Value: consumerSessionsWithProvider.PublicLavaAddress}, utils.LogAttr("version", strings.Join(versions, ","))) + } + return nil + }() + if err != nil { + lastError = err + } } - // public lava address is a value that is not changing, so it's thread safe - if DebugProbes { - utils.LavaFormatDebug("Probed provider successfully", utils.Attribute{Key: "latency", Value: relayLatency}, utils.Attribute{Key: "provider", Value: consumerSessionsWithProvider.PublicLavaAddress}, utils.LogAttr("version", strings.Join(versions, ","))) + + if len(endpointInfos) == 0 { + // no endpoints. + return 0, providerAddress, lastError } - return relayLatency, providerAddress, nil + sort.Sort(EndpointInfoList(endpointInfos)) + consumerSessionsWithProvider.sortEndpointsByLatency(endpointInfos) + return endpointInfos[0].Latency, providerAddress, nil } // csm needs to be locked here @@ -375,7 +397,7 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS sessionEpoch := sessionWithProvider.CurrentEpoch // Get a valid Endpoint from the provider chosen - connected, endpoint, _, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, false) + connected, endpoints, _, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, false, false) if err != nil { // verify err is AllProviderEndpointsDisabled and report. if AllProviderEndpointsDisabledError.Is(err) { @@ -397,6 +419,9 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS continue } + // get the endpoint we got, as its the only one returned when asking fetchEndpointConnectionFromConsumerSessionWithProvider with false value + endpoint := endpoints[0] + // we get the reported providers here after we try to connect, so if any provider didn't respond he will already be added to the list. reportedProviders := csm.GetReportedProviders(sessionEpoch) diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index f958ed38e8..5bdb02db19 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -40,31 +40,133 @@ const ( maxCuForVirtualEpoch = uint64(200) ) +type testServer struct { + delay time.Duration +} + +func (rpcps *testServer) Probe(ctx context.Context, probeReq *pairingtypes.ProbeRequest) (*pairingtypes.ProbeReply, error) { + utils.LavaFormatDebug("Debug probe called") + probeReply := &pairingtypes.ProbeReply{ + Guid: probeReq.GetGuid(), + LatestBlock: 1, + FinalizedBlocksHashes: []byte{}, + LavaEpoch: 1, + LavaLatestBlock: 1, + } + time.Sleep(rpcps.delay) + return probeReply, nil +} + +func (rpcps *testServer) Relay(context.Context, *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) { + return nil, utils.LavaFormatError("not Implemented", nil) +} + +func (rpcps *testServer) RelaySubscribe(*pairingtypes.RelayRequest, pairingtypes.Relayer_RelaySubscribeServer) error { + return utils.LavaFormatError("not implemented", nil) +} + +// Test the basic functionality of the consumerSessionManager +func TestHappyFlow(t *testing.T) { + ctx := context.Background() + csm := CreateConsumerSessionManager() + pairingList := createPairingList("", true) + err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. + require.NoError(t, err) + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session + require.NoError(t, err) + + for _, cs := range css { + require.NotNil(t, cs) + require.Equal(t, cs.Epoch, csm.currentEpoch) + require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest) + err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false) + require.NoError(t, err) + require.Equal(t, cs.Session.CuSum, cuForFirstRequest) + require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone) + require.Equal(t, cs.Session.RelayNum, relayNumberAfterFirstCall) + require.Equal(t, cs.Session.LatestBlock, servicedBlockNumber) + } +} + +func getDelayedAddress() string { + delayedServerAddress := "127.0.0.1:3335" + // because grpcListener is random we might have overlap. in that case just change the port. + if grpcListener == delayedServerAddress { + delayedServerAddress = "127.0.0.1:3336" + } + return delayedServerAddress +} + +func TestEndpointSortingFlow(t *testing.T) { + delayedAddress := getDelayedAddress() + err := createGRPCServer(delayedAddress, time.Millisecond) + csp := &ConsumerSessionsWithProvider{} + for { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _, _, err := csp.ConnectRawClientWithTimeout(ctx, delayedAddress) + if err != nil { + utils.LavaFormatDebug("waiting for grpc server to launch") + continue + } + cancel() + break + } + require.NoError(t, err) + csm := CreateConsumerSessionManager() + pairingList := createPairingList("", true) + pairingList[0].Endpoints = append(pairingList[0].Endpoints, &Endpoint{NetworkAddress: delayedAddress, Enabled: true, Client: nil, ConnectionRefusals: 0}) + // swap locations so that the endpoint of the delayed will be first + pairingList[0].Endpoints[0], pairingList[0].Endpoints[1] = pairingList[0].Endpoints[1], pairingList[0].Endpoints[0] + + // update the pairing and wait for the routine to send all requests + err = csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. + require.NoError(t, err) + + _, ok := csm.pairing[pairingList[0].PublicLavaAddress] + require.True(t, ok) + + // because probing is in a routine we need to wait for the sorting and probing to end asynchronously + swapped := false + for i := 0; i < 10; i++ { + if pairingList[0].Endpoints[0].NetworkAddress == grpcListener { + fmt.Println("Endpoints Are Sorted!", i) + swapped = true + break + } + time.Sleep(1 * time.Second) + fmt.Println("Endpoints did not swap yet, attempt:", i) + } + require.True(t, swapped) + // after creating all the sessions +} + // This variable will hold grpc server address var grpcListener = "localhost:0" func CreateConsumerSessionManager() *ConsumerSessionManager { - AllowInsecureConnectionToProviders = true // set to allow insecure for tests purposes rand.InitRandomSeed() baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil) } -var grpcServer *grpc.Server - func TestMain(m *testing.M) { - serverStarted := make(chan struct{}) - - go func() { - err := createGRPCServer(serverStarted) + AllowInsecureConnectionToProviders = true + err := createGRPCServer("", time.Microsecond) + if err != nil { + fmt.Println("Failed create server", err) + os.Exit(-1) + } + csp := &ConsumerSessionsWithProvider{} + for { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _, _, err := csp.ConnectRawClientWithTimeout(ctx, grpcListener) if err != nil { - fmt.Printf("Failed to start server: %v\n", err) - os.Exit(1) + utils.LavaFormatDebug("waiting for grpc server to launch") + continue } - }() - - // Wait for server to start - <-serverStarted + cancel() + break + } // Start running tests. code := m.Run() @@ -72,31 +174,33 @@ func TestMain(m *testing.M) { os.Exit(code) } -func createGRPCServer(serverStarted chan struct{}) error { - if grpcServer != nil { - close(serverStarted) - return nil +func createGRPCServer(changeListener string, probeDelay time.Duration) error { + listenAddress := grpcListener + if changeListener != "" { + listenAddress = changeListener } - lis, err := net.Listen("tcp", grpcListener) + lis, err := net.Listen("tcp", listenAddress) if err != nil { return err } - // Update the grpcListener with the actual address - grpcListener = lis.Addr().String() + if changeListener == "" { + grpcListener = lis.Addr().String() + } // Create a new server with insecure credentials tlsConfig := GetTlsConfig(NetworkAddressData{}) s := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig))) + s2 := &testServer{delay: probeDelay} + pairingtypes.RegisterRelayerServer(s, s2) + go func() { if err := s.Serve(lis); err != nil { log.Fatalf("Failed to serve: %v", err) + os.Exit(-1) } }() - - grpcServer = s - close(serverStarted) // Signal that the server has started return nil } @@ -134,29 +238,6 @@ func createPairingList(providerPrefixAddress string, enabled bool) map[uint64]*C return cswpList } -// Test the basic functionality of the consumerSessionManager -func TestHappyFlow(t *testing.T) { - ctx := context.Background() - csm := CreateConsumerSessionManager() - pairingList := createPairingList("", true) - err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. - require.NoError(t, err) - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session - require.NoError(t, err) - - for _, cs := range css { - require.NotNil(t, cs) - require.Equal(t, cs.Epoch, csm.currentEpoch) - require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest) - err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false) - require.NoError(t, err) - require.Equal(t, cs.Session.CuSum, cuForFirstRequest) - require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone) - require.Equal(t, cs.Session.RelayNum, relayNumberAfterFirstCall) - require.Equal(t, cs.Session.LatestBlock, servicedBlockNumber) - } -} - func TestNoPairingAvailableFlow(t *testing.T) { ctx := context.Background() csm := CreateConsumerSessionManager() @@ -718,7 +799,7 @@ func TestContext(t *testing.T) { func TestGrpcClientHang(t *testing.T) { ctx := context.Background() - conn, err := ConnectgRPCClient(ctx, grpcListener, true) + conn, err := ConnectGRPCClient(ctx, grpcListener, true) require.NoError(t, err) client := pairingtypes.NewRelayerClient(conn) err = conn.Close() diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index 639e99d7e5..2e5a236836 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -17,6 +17,27 @@ import ( "google.golang.org/grpc/connectivity" ) +type EndpointInfo struct { + Latency time.Duration + Endpoint *Endpoint +} + +// Slice to hold EndpointInfo +type EndpointInfoList []EndpointInfo + +// Implement sort.Interface for EndpointInfoList +func (list EndpointInfoList) Len() int { + return len(list) +} + +func (list EndpointInfoList) Less(i, j int) bool { + return list[i].Latency < list[j].Latency +} + +func (list EndpointInfoList) Swap(i, j int) { + list[i], list[j] = list[j], list[i] +} + const AllowInsecureConnectionToProvidersFlag = "allow-insecure-provider-dialing" var AllowInsecureConnectionToProviders = false @@ -277,7 +298,7 @@ func (cswp *ConsumerSessionsWithProvider) decreaseUsedComputeUnits(cu uint64) er func (cswp *ConsumerSessionsWithProvider) ConnectRawClientWithTimeout(ctx context.Context, addr string) (*pairingtypes.RelayerClient, *grpc.ClientConn, error) { connectCtx, cancel := context.WithTimeout(ctx, TimeoutForEstablishingAConnection) defer cancel() - conn, err := ConnectgRPCClient(connectCtx, addr, AllowInsecureConnectionToProviders) + conn, err := ConnectGRPCClient(connectCtx, addr, AllowInsecureConnectionToProviders) if err != nil { return nil, nil, err } @@ -351,13 +372,43 @@ func (cswp *ConsumerSessionsWithProvider) GetConsumerSessionInstanceFromEndpoint return consumerSession, cswp.PairingEpoch, nil } +func (cswp *ConsumerSessionsWithProvider) sortEndpointsByLatency(endpointInfos []EndpointInfo) { + cswp.Lock.Lock() + defer cswp.Lock.Unlock() + + // validate we do not overflow no matter what. + if len(endpointInfos) > len(cswp.Endpoints) { + utils.LavaFormatError("Not suppose to have larger endpointInfos length than cswp.Endpoints length", nil, utils.LogAttr("endpointInfos", endpointInfos), utils.LogAttr("cswp.Endpoints", cswp.Endpoints)) + return + } + + // endpoint infos are already sorted by the best latency endpoint + for idx, endpoint := range endpointInfos { + // find the endpoint, and swap if indexes do not match expected by latency + for cswpEndpointIdx, cswpEndpoint := range cswp.Endpoints { + if cswpEndpoint.NetworkAddress == endpoint.Endpoint.NetworkAddress { + // found endpoint check the index location matches the order of best endpoints + if cswpEndpointIdx == idx { + break + } else { + // we need to swap the indexes of the endpoints. + tmpEndpoint := cswp.Endpoints[idx] + cswp.Endpoints[idx] = endpoint.Endpoint + cswp.Endpoints[cswpEndpointIdx] = tmpEndpoint + break + } + } + } + } +} + // fetching an endpoint from a ConsumerSessionWithProvider and establishing a connection, // can fail without an error if trying to connect once to each endpoint but none of them are active. -func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSessionWithProvider(ctx context.Context, retryDisabledEndpoints bool) (connected bool, endpointPtr *Endpoint, providerAddress string, err error) { - getConnectionFromConsumerSessionsWithProvider := func(ctx context.Context) (connected bool, endpointPtr *Endpoint, allDisabled bool) { +func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSessionWithProvider(ctx context.Context, retryDisabledEndpoints bool, getAllEndpoints bool) (connected bool, endpointsList []*Endpoint, providerAddress string, err error) { + getConnectionFromConsumerSessionsWithProvider := func(ctx context.Context) (connected bool, endpointPtr []*Endpoint, allDisabled bool) { + endpoints := make([]*Endpoint, 0) cswp.Lock.Lock() defer cswp.Lock.Unlock() - for idx, endpoint := range cswp.Endpoints { // retryDisabledEndpoints will attempt to reconnect to the provider even though we have disabled the endpoint // this is used on a routine that tries to reconnect to a provider that has been disabled due to being unable to connect to it. @@ -408,7 +459,16 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes } cswp.Endpoints[idx] = endpoint cswp.Endpoints[idx].Enabled = true // return enabled once we successfully reconnect - return true, endpoint, false + // successful connection add to endpoints list + endpoints = append(endpoints, endpoint) + if !getAllEndpoints { + return true, endpoints, false + } + } + + // if we managed to get at least one endpoint we can return the list of active endpoints + if len(endpoints) > 0 { + return true, endpoints, false } // checking disabled endpoints, as we can disable an endpoint mid run of the previous loop, we should re test the current endpoint state @@ -425,14 +485,14 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes } var allDisabled bool - connected, endpointPtr, allDisabled = getConnectionFromConsumerSessionsWithProvider(ctx) + connected, endpointsList, allDisabled = getConnectionFromConsumerSessionsWithProvider(ctx) if allDisabled { utils.LavaFormatInfo("purging provider after all endpoints are disabled", utils.Attribute{Key: "provider endpoints", Value: cswp.Endpoints}, utils.Attribute{Key: "provider address", Value: cswp.PublicLavaAddress}) // report provider. - return connected, endpointPtr, cswp.PublicLavaAddress, AllProviderEndpointsDisabledError + return connected, endpointsList, cswp.PublicLavaAddress, AllProviderEndpointsDisabledError } - return connected, endpointPtr, cswp.PublicLavaAddress, nil + return connected, endpointsList, cswp.PublicLavaAddress, nil } func CalculateAvailabilityScore(qosReport *QoSReport) (downtimePercentageRet, scaledAvailabilityScoreRet sdk.Dec) {