Skip to content

Commit

Permalink
feat: PRT - adding debug headers and fixing nil deref on response nil (
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet authored Aug 26, 2024
1 parent 395a054 commit 802730b
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 11 deletions.
5 changes: 4 additions & 1 deletion protocol/chainlib/protocol_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ func (bpm *BaseProtocolMessage) GetBlockedProviders() []string {
}
blockedProviders, ok := bpm.directiveHeaders[common.BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME]
if ok {
return strings.Split(blockedProviders, ",")
blockProviders := strings.Split(blockedProviders, ",")
if len(blockProviders) <= 2 {
return blockProviders
}
}
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ const (
PROVIDER_LATEST_BLOCK_HEADER_NAME = "Provider-Latest-Block"
GUID_HEADER_NAME = "Lava-Guid"
ERRORED_PROVIDERS_HEADER_NAME = "Lava-Errored-Providers"
NODE_ERRORS_PROVIDERS_HEADER_NAME = "Lava-Node-Errors-providers"
REPORTED_PROVIDERS_HEADER_NAME = "Lava-Reported-Providers"
LAVAP_VERSION_HEADER_NAME = "Lavap-Version"
LAVA_CONSUMER_PROCESS_GUID = "lava-consumer-process-guid"
// these headers need to be lowercase
BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME = "lava-providers-block"
Expand Down
6 changes: 6 additions & 0 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type ConsumerSessionManager struct {
activeSubscriptionProvidersStorage *ActiveSubscriptionProvidersStorage
}

func (csm *ConsumerSessionManager) GetNumberOfValidProviders() int {
csm.lock.RLock()
defer csm.lock.RUnlock()
return len(csm.validAddresses)
}

// this is being read in multiple locations and but never changes so no need to lock.
func (csm *ConsumerSessionManager) RPCEndpoint() RPCEndpoint {
return *csm.rpcEndpoint
Expand Down
50 changes: 40 additions & 10 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/lavanet/lava/v2/protocol/lavasession"
"github.com/lavanet/lava/v2/protocol/metrics"
"github.com/lavanet/lava/v2/protocol/performance"
"github.com/lavanet/lava/v2/protocol/upgrade"
"github.com/lavanet/lava/v2/utils"
"github.com/lavanet/lava/v2/utils/protocopy"
"github.com/lavanet/lava/v2/utils/rand"
Expand Down Expand Up @@ -227,7 +228,15 @@ func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retrie
success := false
var err error
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager)
usedProvidersResets := 1
for i := 0; i < retries; i++ {
// Check if we even have enough providers to communicate with them all.
// If we have 1 provider we will reset the used providers always.
// Instead of spamming no pairing available on bootstrap
if ((i + 1) * usedProvidersResets) > rpccs.consumerSessionManager.GetNumberOfValidProviders() {
usedProvidersResets++
relayProcessor.GetUsedProviders().ClearUnwanted()
}
err = rpccs.sendRelayToProvider(ctx, protocolMessage, "-init-", "", relayProcessor, nil)
if lavasession.PairingListEmptyError.Is(err) {
// we don't have pairings anymore, could be related to unwanted providers
Expand Down Expand Up @@ -1405,19 +1414,40 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context,
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, erroredProvidersMD)
}

currentReportedProviders := rpccs.consumerSessionManager.GetReportedProviders(uint64(relayResult.Request.RelaySession.Epoch))
if len(currentReportedProviders) > 0 {
reportedProvidersArray := make([]string, len(currentReportedProviders))
for idx, providerAddress := range currentReportedProviders {
reportedProvidersArray[idx] = providerAddress.Address
nodeErrors := relayProcessor.nodeErrors()
if len(nodeErrors) > 0 {
nodeErrorHeaderString := ""
for _, nodeError := range nodeErrors {
nodeErrorHeaderString += fmt.Sprintf("%s: %s,", nodeError.GetProvider(), string(nodeError.Reply.Data))
}
reportedProvidersString := fmt.Sprintf("%v", reportedProvidersArray)
reportedProvidersMD := pairingtypes.Metadata{
Name: common.REPORTED_PROVIDERS_HEADER_NAME,
Value: reportedProvidersString,
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata,
pairingtypes.Metadata{
Name: common.NODE_ERRORS_PROVIDERS_HEADER_NAME,
Value: nodeErrorHeaderString,
})
}

if relayResult.Request != nil && relayResult.Request.RelaySession != nil {
currentReportedProviders := rpccs.consumerSessionManager.GetReportedProviders(uint64(relayResult.Request.RelaySession.Epoch))
if len(currentReportedProviders) > 0 {
reportedProvidersArray := make([]string, len(currentReportedProviders))
for idx, providerAddress := range currentReportedProviders {
reportedProvidersArray[idx] = providerAddress.Address
}
reportedProvidersString := fmt.Sprintf("%v", reportedProvidersArray)
reportedProvidersMD := pairingtypes.Metadata{
Name: common.REPORTED_PROVIDERS_HEADER_NAME,
Value: reportedProvidersString,
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, reportedProvidersMD)
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, reportedProvidersMD)
}

version := pairingtypes.Metadata{
Name: common.LAVAP_VERSION_HEADER_NAME,
Value: upgrade.GetCurrentVersion().ConsumerVersion,
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, version)
}

relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, metadataReply...)
Expand Down

0 comments on commit 802730b

Please sign in to comment.