Skip to content

Commit

Permalink
feat: PRT adding errored and blocked providers to headers and fixing …
Browse files Browse the repository at this point in the history
…archive issue (#1459)

* PRT-adding-errored-and-blocked-providers-to-headers

* changing log level from info to debug on spammy logs

* fixing protocol test

* removing remove extensions flow from the csm. managing it on rpcconsumer

* removing deprecated types

* adding new flag to determine is session degradation is allowed

* extension disabling flow is now on rpc consumer server which makes more sense.

* adding more info when failing to fetch pairing.

* fix test

* fix test

* adding comment

* register for pairing updates now requires chain id

* exposing spec id

* registering emergency tracker to all chains

* finalization consensus updater spec id

* spec id to pairing updater

* get pairing does not allow empty anymore.

* fix comment

* rename

* making error more robust

* a

* setting timeout for get pairing

* chaning log to warning and making it more clear

* retry on register pairing.

* robustness overload

* ..

* fixing spec for cosmoshub

* fixing grpc header issue on response.

* fix test

* lava debug

* fixing name

* fixing small bugs

* lint
  • Loading branch information
ranlavanet authored Jun 5, 2024
1 parent a1630e4 commit 0c35e16
Show file tree
Hide file tree
Showing 24 changed files with 205 additions and 156 deletions.
6 changes: 4 additions & 2 deletions cookbook/specs/cosmossdk.json
Original file line number Diff line number Diff line change
Expand Up @@ -2176,7 +2176,8 @@
},
{
"name": "grpc-metadata-x-cosmos-block-height",
"kind": "pass_both"
"kind": "pass_both",
"function_tag": "SET_LATEST_IN_METADATA"
}
],
"inheritance_apis": [],
Expand Down Expand Up @@ -4688,7 +4689,8 @@
"headers": [
{
"name": "x-cosmos-block-height",
"kind": "pass_both"
"kind": "pass_both",
"function_tag": "SET_LATEST_IN_METADATA"
},
{
"name": "grpc-metadata-x-cosmos-block-height",
Expand Down
16 changes: 9 additions & 7 deletions protocol/chainlib/chainproxy/rpcInterfaceMessages/grpcMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ func (jm GrpcMessage) CheckResponseError(data []byte, httpStatusCode int) (hasEr
// GetParams will be deprecated after we remove old client
// Currently needed because of parser.RPCInput interface
func (gm GrpcMessage) GetParams() interface{} {
if gm.Msg[0] == '{' || gm.Msg[0] == '[' {
var parsedData interface{}
err := json.Unmarshal(gm.Msg, &parsedData)
if err != nil {
utils.LavaFormatError("failed to unmarshal GetParams", err)
return nil
if len(gm.Msg) > 0 {
if gm.Msg[0] == '{' || gm.Msg[0] == '[' {
var parsedData interface{}
err := json.Unmarshal(gm.Msg, &parsedData)
if err != nil {
utils.LavaFormatError("failed to unmarshal GetParams", err)
return nil
}
return parsedData
}
return parsedData
}
parsedData, err := gm.dynamicResolve()
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion protocol/chainlib/grpcproxy/grpcproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpcproxy
import (
"context"
"net/http"
"strings"

"github.com/gofiber/fiber/v2"
"github.com/improbable-eng/grpc-web/go/grpcweb"
Expand Down Expand Up @@ -79,7 +80,18 @@ func makeProxyFunc(callBack ProxyCallBack) grpc.StreamHandler {
if err != nil {
return err
}
stream.SetHeader(md)

// Convert metadata keys to lowercase
lowercaseMD := metadata.New(map[string]string{})
for k, v := range md {
lowerKey := strings.ToLower(k)
lowercaseMD[lowerKey] = v
}
md = lowercaseMD

if err := stream.SetHeader(md); err != nil {
utils.LavaFormatError("Got error when setting header", err, utils.LogAttr("headers", md))
}
return stream.SendMsg(respBytes)
}
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType
// Check api is supported and save it in nodeMsg
apiCont, err := apip.getSupportedApi(msg.Method, connectionType, internalPath)
if err != nil {
utils.LavaFormatInfo("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err))
utils.LavaFormatDebug("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err))
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (apip *TendermintChainParser) ParseMsg(urlPath string, data []byte, connect
// Check api is supported and save it in nodeMsg
apiCont, err := apip.getSupportedApi(msg.Method, connectionType)
if err != nil {
utils.LavaFormatInfo("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err))
utils.LavaFormatDebug("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err))
return nil, err
}

Expand Down
1 change: 1 addition & 0 deletions protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
RELAY_TIMEOUT_HEADER_NAME = "lava-relay-timeout"
EXTENSION_OVERRIDE_HEADER_NAME = "lava-extension"
FORCE_CACHE_REFRESH_HEADER_NAME = "lava-force-cache-refresh"
LAVA_DEBUG = "lava-debug"
// send http request to /lava/health to see if the process is up - (ret code 200)
DEFAULT_HEALTH_PATH = "/lava/health"
MAXIMUM_ALLOWED_TIMEOUT_EXTEND_MULTIPLIER_BY_THE_CONSUMER = 4
Expand Down
4 changes: 2 additions & 2 deletions protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func createRpcConsumer(t *testing.T, ctx context.Context, specId string, apiInte
_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
baseLatency := common.AverageWorldLatency / 2
optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2)
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil)
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test")
consumerSessionManager.UpdateAllProviders(epoch, pairingList)

consumerConsistency := rpcconsumer.NewConsumerConsistency(specId)
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestConsumerProviderWithProviders(t *testing.T) {
seenError := false
statuses := map[int]struct{}{}
for i := 0; i <= 100; i++ {
client := http.Client{Timeout: 500 * time.Millisecond}
client := http.Client{}
req, err := http.NewRequest("GET", "http://"+consumerListenAddress+"/status", nil)
require.NoError(t, err)

Expand Down
12 changes: 6 additions & 6 deletions protocol/lavaprotocol/finalization_consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type FinalizationConsensus struct {
providerDataContainersMu sync.RWMutex
currentEpoch uint64
latestBlockByMedian uint64 // for caching
specId string
SpecId string
}

type ProviderHashesConsensus struct {
Expand All @@ -41,7 +41,7 @@ type providerDataContainer struct {
}

func NewFinalizationConsensus(specId string) *FinalizationConsensus {
return &FinalizationConsensus{specId: specId}
return &FinalizationConsensus{SpecId: specId}
}

func GetLatestFinalizedBlock(latestBlock, blockDistanceForFinalizedData int64) int64 {
Expand Down Expand Up @@ -150,7 +150,7 @@ func (fc *FinalizationConsensus) UpdateFinalizedHashes(blockDistanceForFinalized
}
}
if debug {
utils.LavaFormatDebug("finalization information update successfully", utils.Attribute{Key: "specId", Value: fc.specId}, utils.Attribute{Key: "finalization data", Value: finalizedBlocks}, utils.Attribute{Key: "currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}, utils.Attribute{Key: "currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus})
utils.LavaFormatDebug("finalization information update successfully", utils.Attribute{Key: "specId", Value: fc.SpecId}, utils.Attribute{Key: "finalization data", Value: finalizedBlocks}, utils.Attribute{Key: "currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}, utils.Attribute{Key: "currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus})
}
return finalizationConflict, nil
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func (fc *FinalizationConsensus) NewEpoch(epoch uint64) {

if fc.currentEpoch < epoch {
if debug {
utils.LavaFormatDebug("finalization information epoch changed", utils.Attribute{Key: "specId", Value: fc.specId}, utils.Attribute{Key: "epoch", Value: epoch})
utils.LavaFormatDebug("finalization information epoch changed", utils.Attribute{Key: "specId", Value: fc.SpecId}, utils.Attribute{Key: "epoch", Value: epoch})
}
// means it's time to refresh the epoch
fc.prevEpochProviderHashesConsensus = fc.currentProviderHashesConsensus
Expand Down Expand Up @@ -256,11 +256,11 @@ func (fc *FinalizationConsensus) ExpectedBlockHeight(chainParser chainlib.ChainP
medianOfExpectedBlocks := median(mapExpectedBlockHeights)
providersMedianOfLatestBlock := medianOfExpectedBlocks + int64(blockDistanceForFinalizedData)
if debug {
utils.LavaFormatDebug("finalization information", utils.Attribute{Key: "specId", Value: fc.specId}, utils.Attribute{Key: "mapExpectedBlockHeights", Value: mapExpectedBlockHeights}, utils.Attribute{Key: "medianOfExpectedBlocks", Value: medianOfExpectedBlocks}, utils.Attribute{Key: "latestBlock", Value: fc.latestBlockByMedian}, utils.Attribute{Key: "providersMedianOfLatestBlock", Value: providersMedianOfLatestBlock})
utils.LavaFormatDebug("finalization information", utils.Attribute{Key: "specId", Value: fc.SpecId}, utils.Attribute{Key: "mapExpectedBlockHeights", Value: mapExpectedBlockHeights}, utils.Attribute{Key: "medianOfExpectedBlocks", Value: medianOfExpectedBlocks}, utils.Attribute{Key: "latestBlock", Value: fc.latestBlockByMedian}, utils.Attribute{Key: "providersMedianOfLatestBlock", Value: providersMedianOfLatestBlock})
}
if medianOfExpectedBlocks > 0 && uint64(providersMedianOfLatestBlock) > fc.latestBlockByMedian {
if uint64(providersMedianOfLatestBlock) > fc.latestBlockByMedian+1000 && fc.latestBlockByMedian > 0 {
utils.LavaFormatError("uncontinuous jump in finalization data", nil, utils.Attribute{Key: "specId", Value: fc.specId}, utils.Attribute{Key: "s.prevEpochProviderHashesConsensus", Value: fc.prevEpochProviderHashesConsensus}, utils.Attribute{Key: "s.currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}, utils.Attribute{Key: "latestBlock", Value: fc.latestBlockByMedian}, utils.Attribute{Key: "providersMedianOfLatestBlock", Value: providersMedianOfLatestBlock})
utils.LavaFormatError("uncontinuous jump in finalization data", nil, utils.Attribute{Key: "specId", Value: fc.SpecId}, utils.Attribute{Key: "s.prevEpochProviderHashesConsensus", Value: fc.prevEpochProviderHashesConsensus}, utils.Attribute{Key: "s.currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}, utils.Attribute{Key: "latestBlock", Value: fc.latestBlockByMedian}, utils.Attribute{Key: "providersMedianOfLatestBlock", Value: providersMedianOfLatestBlock})
}
atomic.StoreUint64(&fc.latestBlockByMedian, uint64(providersMedianOfLatestBlock)) // we can only set conflict to "reported".
}
Expand Down
44 changes: 14 additions & 30 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ConsumerSessionManager struct {
pairingPurge map[string]*ConsumerSessionsWithProvider
providerOptimizer ProviderOptimizer
consumerMetricsManager *metrics.ConsumerMetricsManager
consumerPublicAddress string
}

// this is being read in multiple locations and but never changes so no need to lock.
Expand Down Expand Up @@ -321,6 +322,9 @@ func (csm *ConsumerSessionManager) resetValidAddresses(addon string, extensions
utils.LavaFormatWarning("Provider pairing list is empty, resetting state.", nil, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "extensions", Value: extensions})
} else {
utils.LavaFormatWarning("No providers for asked addon or extension, list is empty after trying to reset", nil, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "extensions", Value: extensions})
if addon == "" && len(extensions) == 0 {
utils.LavaFormatError("User subscription might have expired or not purchased properly, pairing list is empty after reset.", nil, utils.LogAttr("consumer_address", csm.consumerPublicAddress))
}
}
csm.numberOfResets += 1
}
Expand Down Expand Up @@ -350,6 +354,12 @@ func (csm *ConsumerSessionManager) validatePairingListNotEmpty(addon string, ext
return numberOfResets
}

func (csm *ConsumerSessionManager) getValidAddressesLengthForExtensionOrAddon(addon string, extensions []string) int {
csm.lock.RLock()
defer csm.lock.RUnlock()
return len(csm.getValidAddresses(addon, extensions))
}

func (csm *ConsumerSessionManager) getSessionWithProviderOrError(usedProviders UsedProvidersInf, tempIgnoredProviders *ignoredProviders, cuNeededForSession uint64, requestedBlock int64, addon string, extensionNames []string, stateful uint32, virtualEpoch uint64) (sessionWithProviderMap SessionWithProviderMap, err error) {
sessionWithProviderMap, err = csm.getValidConsumerSessionsWithProvider(tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, extensionNames, stateful, virtualEpoch)
if err != nil {
Expand All @@ -358,24 +368,7 @@ func (csm *ConsumerSessionManager) getSessionWithProviderOrError(usedProviders U
var errOnRetry error
sessionWithProviderMap, errOnRetry = csm.tryGetConsumerSessionWithProviderFromBlockedProviderList(tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, extensionNames, stateful, virtualEpoch, usedProviders)
if errOnRetry != nil {
// we validate currently used providers are 0 meaning we didn't find a valid extension provider
// so we don't return an invalid error while waiting for a reply for a valid provider.
// in the case we do not have any relays sent at the moment we get a provider from the regular list
if usedProviders.CurrentlyUsed() == 0 && PairingListEmptyError.Is(errOnRetry) && (len(extensionNames) > 0) {
var errGetRegularProvider error
emptyExtensionNames := []string{}
sessionWithProviderMap, errGetRegularProvider = csm.getValidConsumerSessionsWithProvider(tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, emptyExtensionNames, stateful, virtualEpoch)
if errGetRegularProvider != nil {
return nil, err // return original error (getValidConsumerSessionsWithProvider)
}
for key := range sessionWithProviderMap {
sessionWithProviderMap[key].RemoveExtensions = true
}
// print a warning in case we got a provider who does not support that addon or extension.
utils.LavaFormatWarning("No Providers For Addon Or Extension, using regular provider for relay", errOnRetry, utils.LogAttr("addon", addon), utils.LogAttr("extensions", extensionNames), utils.LogAttr("providers_chosen", sessionWithProviderMap))
} else {
return nil, err // return original error (getValidConsumerSessionsWithProvider)
}
return nil, err // return original error (getValidConsumerSessionsWithProvider)
}
} else {
return nil, err
Expand Down Expand Up @@ -422,15 +415,6 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
sessions := make(ConsumerSessionsMap, wantedSession)
for {
for providerAddress, sessionWithProvider := range sessionWithProviderMap {
// adding a protection when using RemoveAddonsAndExtensions to use only one session.
// we can get here if we wanted 3 archive and got 2 only because one couldn't connect,
// so we tried getting more sessions and got a regular provider due to no pairings available.
// in that case just return the current sessions that we do have.
if sessionWithProvider.RemoveExtensions && len(sessions) >= 1 {
utils.LavaFormatDebug("Too many sessions when using RemoveAddonAndExtensions session", utils.LogAttr("sessions", sessions), utils.LogAttr("wanted_to_add", sessionWithProvider))
// in that case we just return the sessions we already have.
return sessions, nil
}
// Extract values from session with provider
consumerSessionsWithProvider := sessionWithProvider.SessionsWithProvider
sessionEpoch := sessionWithProvider.CurrentEpoch
Expand Down Expand Up @@ -527,7 +511,6 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
Session: consumerSession,
Epoch: sessionEpoch,
ReportedProviders: reportedProviders,
RemoveExtensions: sessionWithProvider.RemoveExtensions,
}

// adding qos summery for error parsing.
Expand Down Expand Up @@ -709,7 +692,7 @@ func (csm *ConsumerSessionManager) getValidConsumerSessionsWithProvider(ignoredP
// Fetch provider addresses
providerAddresses, err := csm.getValidProviderAddresses(ignoredProviders.providers, cuNeededForSession, requestedBlock, addon, extensions, stateful)
if err != nil {
utils.LavaFormatInfo(csm.rpcEndpoint.ChainID+" could not get a provider addresses", utils.LogAttr("error", err))
utils.LavaFormatDebug(csm.rpcEndpoint.ChainID+" could not get a provider addresses", utils.LogAttr("error", err))
return nil, err
}

Expand Down Expand Up @@ -1050,10 +1033,11 @@ func (csm *ConsumerSessionManager) GenerateReconnectCallback(consumerSessionsWit
}
}

func NewConsumerSessionManager(rpcEndpoint *RPCEndpoint, providerOptimizer ProviderOptimizer, consumerMetricsManager *metrics.ConsumerMetricsManager, reporter metrics.Reporter) *ConsumerSessionManager {
func NewConsumerSessionManager(rpcEndpoint *RPCEndpoint, providerOptimizer ProviderOptimizer, consumerMetricsManager *metrics.ConsumerMetricsManager, reporter metrics.Reporter, consumerPublicAddress string) *ConsumerSessionManager {
csm := &ConsumerSessionManager{
reportedProviders: NewReportedProviders(reporter),
consumerMetricsManager: consumerMetricsManager,
consumerPublicAddress: consumerPublicAddress,
}
csm.rpcEndpoint = rpcEndpoint
csm.providerOptimizer = providerOptimizer
Expand Down
41 changes: 22 additions & 19 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
maxCuForVirtualEpoch = uint64(200)
)

// This variable will hold grpc server address
var grpcListener = "localhost:0"

type testServer struct {
delay time.Duration
}
Expand Down Expand Up @@ -88,41 +91,44 @@ func TestHappyFlow(t *testing.T) {
}
}

func TestExtensionDoesNotExistOnPairingList(t *testing.T) {
ctx := context.Background()
csm := CreateConsumerSessionManager()
pairingList := createPairingList("", true)
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)
ext := []*spectypes.Extension{{Name: "test_non_existing_ex", Rule: &spectypes.Rule{Block: 555}, CuMultiplier: 5}}
_, err = csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", ext, common.NO_STATE, 0) // get a session
// if we got a session successfully we should get no error.
require.NoError(t, err)
}

func getDelayedAddress() string {
delayedServerAddress := "127.0.0.1:3335"
// because grpcListener is random we might have overlap. in that case just change the port.
if grpcListener == delayedServerAddress {
delayedServerAddress = "127.0.0.1:3336"
}
utils.LavaFormatDebug("delayedAddress Chosen", utils.LogAttr("address", delayedServerAddress))
return delayedServerAddress
}

func TestEndpointSortingFlow(t *testing.T) {
delayedAddress := getDelayedAddress()
err := createGRPCServer(delayedAddress, time.Millisecond)
err := createGRPCServer(delayedAddress, 300*time.Millisecond)
csp := &ConsumerSessionsWithProvider{}
for {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, _, err := csp.ConnectRawClientWithTimeout(ctx, delayedAddress)
if err != nil {
utils.LavaFormatDebug("waiting for grpc server to launch")
utils.LavaFormatDebug("delayedAddress - waiting for grpc server to launch")
continue
}
utils.LavaFormatDebug("delayedAddress - grpc server is live", utils.LogAttr("address", delayedAddress))
cancel()
break
}

for {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, _, err := csp.ConnectRawClientWithTimeout(ctx, grpcListener)
if err != nil {
utils.LavaFormatDebug("grpcListener - waiting for grpc server to launch")
continue
}
utils.LavaFormatDebug("grpcListener - grpc server is live", utils.LogAttr("address", grpcListener))
cancel()
break
}

require.NoError(t, err)
csm := CreateConsumerSessionManager()
pairingList := createPairingList("", true)
Expand All @@ -139,7 +145,7 @@ func TestEndpointSortingFlow(t *testing.T) {

// because probing is in a routine we need to wait for the sorting and probing to end asynchronously
swapped := false
for i := 0; i < 10; i++ {
for i := 0; i < 20; i++ {
if pairingList[0].Endpoints[0].NetworkAddress == grpcListener {
fmt.Println("Endpoints Are Sorted!", i)
swapped = true
Expand All @@ -152,13 +158,10 @@ func TestEndpointSortingFlow(t *testing.T) {
// after creating all the sessions
}

// This variable will hold grpc server address
var grpcListener = "localhost:0"

func CreateConsumerSessionManager() *ConsumerSessionManager {
rand.InitRandomSeed()
baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil)
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil, "lava@test")
}

func TestMain(m *testing.M) {
Expand Down
Loading

0 comments on commit 0c35e16

Please sign in to comment.