Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT adding errored and blocked providers to headers and fixing archive issue #1459

Merged
merged 35 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
90db93d
PRT-adding-errored-and-blocked-providers-to-headers
ranlavanet May 30, 2024
691ff48
changing log level from info to debug on spammy logs
ranlavanet May 30, 2024
263df9a
fixing protocol test
ranlavanet Jun 2, 2024
e3002df
removing remove extensions flow from the csm. managing it on rpcconsumer
ranlavanet Jun 2, 2024
d5c538e
removing deprecated types
ranlavanet Jun 2, 2024
01a2fb0
adding new flag to determine is session degradation is allowed
ranlavanet Jun 2, 2024
f45ad67
extension disabling flow is now on rpc consumer server which makes mo…
ranlavanet Jun 2, 2024
64a0130
adding more info when failing to fetch pairing.
ranlavanet Jun 2, 2024
cac2206
Merge branch 'main' into PRT-adding-errored-and-blocked-providers-to-…
ranlavanet Jun 2, 2024
a99c3c2
fix test
ranlavanet Jun 2, 2024
c007eba
fix test
ranlavanet Jun 2, 2024
df3fe7c
adding comment
ranlavanet Jun 2, 2024
7960535
register for pairing updates now requires chain id
ranlavanet Jun 2, 2024
3a490ba
exposing spec id
ranlavanet Jun 2, 2024
8af1a0a
registering emergency tracker to all chains
ranlavanet Jun 2, 2024
42bea10
finalization consensus updater spec id
ranlavanet Jun 2, 2024
c0544d4
spec id to pairing updater
ranlavanet Jun 2, 2024
5d397cf
get pairing does not allow empty anymore.
ranlavanet Jun 2, 2024
53c5ff0
fix comment
ranlavanet Jun 2, 2024
f7c6fc8
rename
ranlavanet Jun 2, 2024
617a659
making error more robust
ranlavanet Jun 2, 2024
de9427a
a
ranlavanet Jun 2, 2024
92816db
setting timeout for get pairing
ranlavanet Jun 2, 2024
2c33413
chaning log to warning and making it more clear
ranlavanet Jun 2, 2024
602a078
retry on register pairing.
ranlavanet Jun 2, 2024
1e993b1
robustness overload
ranlavanet Jun 2, 2024
6f4f511
..
ranlavanet Jun 2, 2024
b37d044
fixing spec for cosmoshub
ranlavanet Jun 3, 2024
1a6fae8
fixing grpc header issue on response.
ranlavanet Jun 3, 2024
b1b2275
Merge branch 'main' into PRT-adding-errored-and-blocked-providers-to-…
ranlavanet Jun 3, 2024
be2d01d
fix test
ranlavanet Jun 3, 2024
64a3c14
lava debug
ranlavanet Jun 3, 2024
1c94b8d
fixing name
ranlavanet Jun 5, 2024
2d0d9db
fixing small bugs
ranlavanet Jun 5, 2024
342fb2f
lint
ranlavanet Jun 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cookbook/specs/cosmossdk.json
Original file line number Diff line number Diff line change
Expand Up @@ -2176,7 +2176,8 @@
},
{
"name": "grpc-metadata-x-cosmos-block-height",
"kind": "pass_both"
"kind": "pass_both",
"function_tag": "SET_LATEST_IN_METADATA"
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
}
],
"inheritance_apis": [],
Expand Down Expand Up @@ -4688,7 +4689,8 @@
"headers": [
{
"name": "x-cosmos-block-height",
"kind": "pass_both"
"kind": "pass_both",
"function_tag": "SET_LATEST_IN_METADATA"
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
},
{
"name": "grpc-metadata-x-cosmos-block-height",
Expand Down
16 changes: 9 additions & 7 deletions protocol/chainlib/chainproxy/rpcInterfaceMessages/grpcMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ func (jm GrpcMessage) CheckResponseError(data []byte, httpStatusCode int) (hasEr
// GetParams will be deprecated after we remove old client
// Currently needed because of parser.RPCInput interface
func (gm GrpcMessage) GetParams() interface{} {
if gm.Msg[0] == '{' || gm.Msg[0] == '[' {
var parsedData interface{}
err := json.Unmarshal(gm.Msg, &parsedData)
if err != nil {
utils.LavaFormatError("failed to unmarshal GetParams", err)
return nil
if len(gm.Msg) > 0 {
if gm.Msg[0] == '{' || gm.Msg[0] == '[' {
var parsedData interface{}
err := json.Unmarshal(gm.Msg, &parsedData)
if err != nil {
utils.LavaFormatError("failed to unmarshal GetParams", err)
return nil
}
return parsedData
}
return parsedData
}
parsedData, err := gm.dynamicResolve()
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion protocol/chainlib/grpcproxy/grpcproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpcproxy
import (
"context"
"net/http"
"strings"

"github.com/gofiber/fiber/v2"
"github.com/improbable-eng/grpc-web/go/grpcweb"
Expand Down Expand Up @@ -79,7 +80,18 @@ func makeProxyFunc(callBack ProxyCallBack) grpc.StreamHandler {
if err != nil {
return err
}
stream.SetHeader(md)

// Convert metadata keys to lowercase
lowercaseMD := metadata.New(map[string]string{})
for k, v := range md {
lowerKey := strings.ToLower(k)
lowercaseMD[lowerKey] = v
}
md = lowercaseMD

if err := stream.SetHeader(md); err != nil {
utils.LavaFormatError("Got error when setting header", err, utils.LogAttr("headers", md))
}
return stream.SendMsg(respBytes)
}
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType
// Check api is supported and save it in nodeMsg
apiCont, err := apip.getSupportedApi(msg.Method, connectionType, internalPath)
if err != nil {
utils.LavaFormatInfo("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err))
utils.LavaFormatDebug("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err))
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (apip *TendermintChainParser) ParseMsg(urlPath string, data []byte, connect
// Check api is supported and save it in nodeMsg
apiCont, err := apip.getSupportedApi(msg.Method, connectionType)
if err != nil {
utils.LavaFormatInfo("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err))
utils.LavaFormatDebug("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err))
return nil, err
}

Expand Down
1 change: 1 addition & 0 deletions protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
RELAY_TIMEOUT_HEADER_NAME = "lava-relay-timeout"
EXTENSION_OVERRIDE_HEADER_NAME = "lava-extension"
FORCE_CACHE_REFRESH_HEADER_NAME = "lava-force-cache-refresh"
LAVA_DEBUG = "lava-debug"
// send http request to /lava/health to see if the process is up - (ret code 200)
DEFAULT_HEALTH_PATH = "/lava/health"
MAXIMUM_ALLOWED_TIMEOUT_EXTEND_MULTIPLIER_BY_THE_CONSUMER = 4
Expand Down
4 changes: 2 additions & 2 deletions protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func createRpcConsumer(t *testing.T, ctx context.Context, specId string, apiInte
_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
baseLatency := common.AverageWorldLatency / 2
optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2)
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil)
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test")
consumerSessionManager.UpdateAllProviders(epoch, pairingList)

consumerConsistency := rpcconsumer.NewConsumerConsistency(specId)
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestConsumerProviderWithProviders(t *testing.T) {
seenError := false
statuses := map[int]struct{}{}
for i := 0; i <= 100; i++ {
client := http.Client{Timeout: 500 * time.Millisecond}
client := http.Client{}
req, err := http.NewRequest("GET", "http://"+consumerListenAddress+"/status", nil)
require.NoError(t, err)

Expand Down
12 changes: 6 additions & 6 deletions protocol/lavaprotocol/finalization_consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type FinalizationConsensus struct {
providerDataContainersMu sync.RWMutex
currentEpoch uint64
latestBlockByMedian uint64 // for caching
specId string
SpecId string
}

type ProviderHashesConsensus struct {
Expand All @@ -41,7 +41,7 @@ type providerDataContainer struct {
}

func NewFinalizationConsensus(specId string) *FinalizationConsensus {
return &FinalizationConsensus{specId: specId}
return &FinalizationConsensus{SpecId: specId}
}

func GetLatestFinalizedBlock(latestBlock, blockDistanceForFinalizedData int64) int64 {
Expand Down Expand Up @@ -150,7 +150,7 @@ func (fc *FinalizationConsensus) UpdateFinalizedHashes(blockDistanceForFinalized
}
}
if debug {
utils.LavaFormatDebug("finalization information update successfully", utils.Attribute{Key: "specId", Value: fc.specId}, utils.Attribute{Key: "finalization data", Value: finalizedBlocks}, utils.Attribute{Key: "currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}, utils.Attribute{Key: "currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus})
utils.LavaFormatDebug("finalization information update successfully", utils.Attribute{Key: "specId", Value: fc.SpecId}, utils.Attribute{Key: "finalization data", Value: finalizedBlocks}, utils.Attribute{Key: "currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}, utils.Attribute{Key: "currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus})
}
return finalizationConflict, nil
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func (fc *FinalizationConsensus) NewEpoch(epoch uint64) {

if fc.currentEpoch < epoch {
if debug {
utils.LavaFormatDebug("finalization information epoch changed", utils.Attribute{Key: "specId", Value: fc.specId}, utils.Attribute{Key: "epoch", Value: epoch})
utils.LavaFormatDebug("finalization information epoch changed", utils.Attribute{Key: "specId", Value: fc.SpecId}, utils.Attribute{Key: "epoch", Value: epoch})
}
// means it's time to refresh the epoch
fc.prevEpochProviderHashesConsensus = fc.currentProviderHashesConsensus
Expand Down Expand Up @@ -256,11 +256,11 @@ func (fc *FinalizationConsensus) ExpectedBlockHeight(chainParser chainlib.ChainP
medianOfExpectedBlocks := median(mapExpectedBlockHeights)
providersMedianOfLatestBlock := medianOfExpectedBlocks + int64(blockDistanceForFinalizedData)
if debug {
utils.LavaFormatDebug("finalization information", utils.Attribute{Key: "specId", Value: fc.specId}, utils.Attribute{Key: "mapExpectedBlockHeights", Value: mapExpectedBlockHeights}, utils.Attribute{Key: "medianOfExpectedBlocks", Value: medianOfExpectedBlocks}, utils.Attribute{Key: "latestBlock", Value: fc.latestBlockByMedian}, utils.Attribute{Key: "providersMedianOfLatestBlock", Value: providersMedianOfLatestBlock})
utils.LavaFormatDebug("finalization information", utils.Attribute{Key: "specId", Value: fc.SpecId}, utils.Attribute{Key: "mapExpectedBlockHeights", Value: mapExpectedBlockHeights}, utils.Attribute{Key: "medianOfExpectedBlocks", Value: medianOfExpectedBlocks}, utils.Attribute{Key: "latestBlock", Value: fc.latestBlockByMedian}, utils.Attribute{Key: "providersMedianOfLatestBlock", Value: providersMedianOfLatestBlock})
}
if medianOfExpectedBlocks > 0 && uint64(providersMedianOfLatestBlock) > fc.latestBlockByMedian {
if uint64(providersMedianOfLatestBlock) > fc.latestBlockByMedian+1000 && fc.latestBlockByMedian > 0 {
utils.LavaFormatError("uncontinuous jump in finalization data", nil, utils.Attribute{Key: "specId", Value: fc.specId}, utils.Attribute{Key: "s.prevEpochProviderHashesConsensus", Value: fc.prevEpochProviderHashesConsensus}, utils.Attribute{Key: "s.currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}, utils.Attribute{Key: "latestBlock", Value: fc.latestBlockByMedian}, utils.Attribute{Key: "providersMedianOfLatestBlock", Value: providersMedianOfLatestBlock})
utils.LavaFormatError("uncontinuous jump in finalization data", nil, utils.Attribute{Key: "specId", Value: fc.SpecId}, utils.Attribute{Key: "s.prevEpochProviderHashesConsensus", Value: fc.prevEpochProviderHashesConsensus}, utils.Attribute{Key: "s.currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}, utils.Attribute{Key: "latestBlock", Value: fc.latestBlockByMedian}, utils.Attribute{Key: "providersMedianOfLatestBlock", Value: providersMedianOfLatestBlock})
}
atomic.StoreUint64(&fc.latestBlockByMedian, uint64(providersMedianOfLatestBlock)) // we can only set conflict to "reported".
}
Expand Down
44 changes: 14 additions & 30 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ConsumerSessionManager struct {
pairingPurge map[string]*ConsumerSessionsWithProvider
providerOptimizer ProviderOptimizer
consumerMetricsManager *metrics.ConsumerMetricsManager
consumerPublicAddress string
}

// this is being read in multiple locations and but never changes so no need to lock.
Expand Down Expand Up @@ -321,6 +322,9 @@ func (csm *ConsumerSessionManager) resetValidAddresses(addon string, extensions
utils.LavaFormatWarning("Provider pairing list is empty, resetting state.", nil, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "extensions", Value: extensions})
} else {
utils.LavaFormatWarning("No providers for asked addon or extension, list is empty after trying to reset", nil, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "extensions", Value: extensions})
if addon == "" && len(extensions) == 0 {
utils.LavaFormatError("User subscription might have expired or not purchased properly, pairing list is empty after reset.", nil, utils.LogAttr("consumer_address", csm.consumerPublicAddress))
}
}
csm.numberOfResets += 1
}
Expand Down Expand Up @@ -350,6 +354,12 @@ func (csm *ConsumerSessionManager) validatePairingListNotEmpty(addon string, ext
return numberOfResets
}

func (csm *ConsumerSessionManager) getValidAddressesLengthForExtensionOrAddon(addon string, extensions []string) int {
csm.lock.RLock()
defer csm.lock.RUnlock()
return len(csm.getValidAddresses(addon, extensions))
}

func (csm *ConsumerSessionManager) getSessionWithProviderOrError(usedProviders UsedProvidersInf, tempIgnoredProviders *ignoredProviders, cuNeededForSession uint64, requestedBlock int64, addon string, extensionNames []string, stateful uint32, virtualEpoch uint64) (sessionWithProviderMap SessionWithProviderMap, err error) {
sessionWithProviderMap, err = csm.getValidConsumerSessionsWithProvider(tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, extensionNames, stateful, virtualEpoch)
if err != nil {
Expand All @@ -358,24 +368,7 @@ func (csm *ConsumerSessionManager) getSessionWithProviderOrError(usedProviders U
var errOnRetry error
sessionWithProviderMap, errOnRetry = csm.tryGetConsumerSessionWithProviderFromBlockedProviderList(tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, extensionNames, stateful, virtualEpoch, usedProviders)
if errOnRetry != nil {
// we validate currently used providers are 0 meaning we didn't find a valid extension provider
// so we don't return an invalid error while waiting for a reply for a valid provider.
// in the case we do not have any relays sent at the moment we get a provider from the regular list
if usedProviders.CurrentlyUsed() == 0 && PairingListEmptyError.Is(errOnRetry) && (len(extensionNames) > 0) {
var errGetRegularProvider error
emptyExtensionNames := []string{}
sessionWithProviderMap, errGetRegularProvider = csm.getValidConsumerSessionsWithProvider(tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, emptyExtensionNames, stateful, virtualEpoch)
if errGetRegularProvider != nil {
return nil, err // return original error (getValidConsumerSessionsWithProvider)
}
for key := range sessionWithProviderMap {
sessionWithProviderMap[key].RemoveExtensions = true
}
// print a warning in case we got a provider who does not support that addon or extension.
utils.LavaFormatWarning("No Providers For Addon Or Extension, using regular provider for relay", errOnRetry, utils.LogAttr("addon", addon), utils.LogAttr("extensions", extensionNames), utils.LogAttr("providers_chosen", sessionWithProviderMap))
} else {
return nil, err // return original error (getValidConsumerSessionsWithProvider)
}
return nil, err // return original error (getValidConsumerSessionsWithProvider)
}
} else {
return nil, err
Expand Down Expand Up @@ -422,15 +415,6 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
sessions := make(ConsumerSessionsMap, wantedSession)
for {
for providerAddress, sessionWithProvider := range sessionWithProviderMap {
// adding a protection when using RemoveAddonsAndExtensions to use only one session.
// we can get here if we wanted 3 archive and got 2 only because one couldn't connect,
// so we tried getting more sessions and got a regular provider due to no pairings available.
// in that case just return the current sessions that we do have.
if sessionWithProvider.RemoveExtensions && len(sessions) >= 1 {
utils.LavaFormatDebug("Too many sessions when using RemoveAddonAndExtensions session", utils.LogAttr("sessions", sessions), utils.LogAttr("wanted_to_add", sessionWithProvider))
// in that case we just return the sessions we already have.
return sessions, nil
}
// Extract values from session with provider
consumerSessionsWithProvider := sessionWithProvider.SessionsWithProvider
sessionEpoch := sessionWithProvider.CurrentEpoch
Expand Down Expand Up @@ -527,7 +511,6 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
Session: consumerSession,
Epoch: sessionEpoch,
ReportedProviders: reportedProviders,
RemoveExtensions: sessionWithProvider.RemoveExtensions,
}

// adding qos summery for error parsing.
Expand Down Expand Up @@ -709,7 +692,7 @@ func (csm *ConsumerSessionManager) getValidConsumerSessionsWithProvider(ignoredP
// Fetch provider addresses
providerAddresses, err := csm.getValidProviderAddresses(ignoredProviders.providers, cuNeededForSession, requestedBlock, addon, extensions, stateful)
if err != nil {
utils.LavaFormatInfo(csm.rpcEndpoint.ChainID+" could not get a provider addresses", utils.LogAttr("error", err))
utils.LavaFormatDebug(csm.rpcEndpoint.ChainID+" could not get a provider addresses", utils.LogAttr("error", err))
return nil, err
}

Expand Down Expand Up @@ -1050,10 +1033,11 @@ func (csm *ConsumerSessionManager) GenerateReconnectCallback(consumerSessionsWit
}
}

func NewConsumerSessionManager(rpcEndpoint *RPCEndpoint, providerOptimizer ProviderOptimizer, consumerMetricsManager *metrics.ConsumerMetricsManager, reporter metrics.Reporter) *ConsumerSessionManager {
func NewConsumerSessionManager(rpcEndpoint *RPCEndpoint, providerOptimizer ProviderOptimizer, consumerMetricsManager *metrics.ConsumerMetricsManager, reporter metrics.Reporter, consumerPublicAddress string) *ConsumerSessionManager {
csm := &ConsumerSessionManager{
reportedProviders: NewReportedProviders(reporter),
consumerMetricsManager: consumerMetricsManager,
consumerPublicAddress: consumerPublicAddress,
}
csm.rpcEndpoint = rpcEndpoint
csm.providerOptimizer = providerOptimizer
Expand Down
41 changes: 22 additions & 19 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
maxCuForVirtualEpoch = uint64(200)
)

// This variable will hold grpc server address
var grpcListener = "localhost:0"

type testServer struct {
delay time.Duration
}
Expand Down Expand Up @@ -88,41 +91,44 @@ func TestHappyFlow(t *testing.T) {
}
}

func TestExtensionDoesNotExistOnPairingList(t *testing.T) {
ctx := context.Background()
csm := CreateConsumerSessionManager()
pairingList := createPairingList("", true)
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)
ext := []*spectypes.Extension{{Name: "test_non_existing_ex", Rule: &spectypes.Rule{Block: 555}, CuMultiplier: 5}}
_, err = csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", ext, common.NO_STATE, 0) // get a session
// if we got a session successfully we should get no error.
require.NoError(t, err)
}

func getDelayedAddress() string {
delayedServerAddress := "127.0.0.1:3335"
// because grpcListener is random we might have overlap. in that case just change the port.
if grpcListener == delayedServerAddress {
delayedServerAddress = "127.0.0.1:3336"
}
utils.LavaFormatDebug("delayedAddress Chosen", utils.LogAttr("address", delayedServerAddress))
return delayedServerAddress
}

func TestEndpointSortingFlow(t *testing.T) {
delayedAddress := getDelayedAddress()
err := createGRPCServer(delayedAddress, time.Millisecond)
err := createGRPCServer(delayedAddress, 300*time.Millisecond)
csp := &ConsumerSessionsWithProvider{}
for {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, _, err := csp.ConnectRawClientWithTimeout(ctx, delayedAddress)
if err != nil {
utils.LavaFormatDebug("waiting for grpc server to launch")
utils.LavaFormatDebug("delayedAddress - waiting for grpc server to launch")
continue
}
utils.LavaFormatDebug("delayedAddress - grpc server is live", utils.LogAttr("address", delayedAddress))
cancel()
break
}

for {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, _, err := csp.ConnectRawClientWithTimeout(ctx, grpcListener)
if err != nil {
utils.LavaFormatDebug("grpcListener - waiting for grpc server to launch")
continue
}
utils.LavaFormatDebug("grpcListener - grpc server is live", utils.LogAttr("address", grpcListener))
cancel()
break
}

require.NoError(t, err)
csm := CreateConsumerSessionManager()
pairingList := createPairingList("", true)
Expand All @@ -139,7 +145,7 @@ func TestEndpointSortingFlow(t *testing.T) {

// because probing is in a routine we need to wait for the sorting and probing to end asynchronously
swapped := false
for i := 0; i < 10; i++ {
for i := 0; i < 20; i++ {
if pairingList[0].Endpoints[0].NetworkAddress == grpcListener {
fmt.Println("Endpoints Are Sorted!", i)
swapped = true
Expand All @@ -152,13 +158,10 @@ func TestEndpointSortingFlow(t *testing.T) {
// after creating all the sessions
}

// This variable will hold grpc server address
var grpcListener = "localhost:0"

func CreateConsumerSessionManager() *ConsumerSessionManager {
rand.InitRandomSeed()
baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil)
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil, "lava@test")
}

func TestMain(m *testing.M) {
Expand Down
Loading
Loading