From 526ef73c9794638e4f2f94483a3d2cf957201dca Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 22 Aug 2024 16:49:51 +0200 Subject: [PATCH 01/17] feat: PRT - dappid and consumer ip added to protocol message. --- protocol/chainlib/chainlib.go | 2 - protocol/chainlib/chainlib_mock.go | 8 +-- .../chainlib/consumer_websocket_manager.go | 2 +- .../consumer_ws_subscription_manager.go | 13 +++-- .../consumer_ws_subscription_manager_test.go | 31 +++++----- protocol/chainlib/protocol_message.go | 14 ++++- protocol/rpcconsumer/relay_processor.go | 33 +++++------ protocol/rpcconsumer/relay_processor_test.go | 36 ++++++++---- protocol/rpcconsumer/rpcconsumer_server.go | 57 ++++++++++--------- 9 files changed, 108 insertions(+), 88 deletions(-) diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 8ed037669d..a9639f9b52 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -128,8 +128,6 @@ type RelaySender interface { ) (ProtocolMessage, error) SendParsedRelay( ctx context.Context, - dappID string, - consumerIp string, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage, ) (relayResult *common.RelayResult, errRet error) diff --git a/protocol/chainlib/chainlib_mock.go b/protocol/chainlib/chainlib_mock.go index 757c2cd9e0..2d5dbf74f9 100644 --- a/protocol/chainlib/chainlib_mock.go +++ b/protocol/chainlib/chainlib_mock.go @@ -721,18 +721,18 @@ func (mr *MockRelaySenderMockRecorder) ParseRelay(ctx, url, req, connectionType, } // SendParsedRelay mocks base method. -func (m *MockRelaySender) SendParsedRelay(ctx context.Context, dappID, consumerIp string, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage) (*common.RelayResult, error) { +func (m *MockRelaySender) SendParsedRelay(ctx context.Context, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage) (*common.RelayResult, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendParsedRelay", ctx, dappID, consumerIp, analytics, protocolMessage) + ret := m.ctrl.Call(m, "SendParsedRelay", ctx, analytics, protocolMessage) ret0, _ := ret[0].(*common.RelayResult) ret1, _ := ret[1].(error) return ret0, ret1 } // SendParsedRelay indicates an expected call of SendParsedRelay. -func (mr *MockRelaySenderMockRecorder) SendParsedRelay(ctx, dappID, consumerIp, analytics, protocolMessage interface{}) *gomock.Call { +func (mr *MockRelaySenderMockRecorder) SendParsedRelay(ctx, analytics, protocolMessage interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendParsedRelay", reflect.TypeOf((*MockRelaySender)(nil).SendParsedRelay), ctx, dappID, consumerIp, analytics, protocolMessage) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendParsedRelay", reflect.TypeOf((*MockRelaySender)(nil).SendParsedRelay), ctx, analytics, protocolMessage) } // SendRelay mocks base method. diff --git a/protocol/chainlib/consumer_websocket_manager.go b/protocol/chainlib/consumer_websocket_manager.go index 75dd3ca1ba..aefb3878bc 100644 --- a/protocol/chainlib/consumer_websocket_manager.go +++ b/protocol/chainlib/consumer_websocket_manager.go @@ -182,7 +182,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() { continue } else { // Normal relay over websocket. (not subscription related) - relayResult, err := cwm.relaySender.SendParsedRelay(webSocketCtx, dappID, userIp, metricsData, protocolMessage) + relayResult, err := cwm.relaySender.SendParsedRelay(webSocketCtx, metricsData, protocolMessage) if err != nil { formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), utils.LavaFormatError("could not send parsed relay", err), msgSeed, msg, cwm.apiInterface, time.Since(startTime)) if formatterMsg != nil { diff --git a/protocol/chainlib/consumer_ws_subscription_manager.go b/protocol/chainlib/consumer_ws_subscription_manager.go index dda1405573..0f883e2f09 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager.go +++ b/protocol/chainlib/consumer_ws_subscription_manager.go @@ -301,7 +301,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( utils.LogAttr("dappKey", dappKey), ) - relayResult, err := cwsm.relaySender.SendParsedRelay(webSocketCtx, dappID, consumerIp, metricsData, protocolMessage) + relayResult, err := cwsm.relaySender.SendParsedRelay(webSocketCtx, metricsData, protocolMessage) if err != nil { onSubscriptionFailure() return nil, nil, utils.LavaFormatError("could not send subscription relay", err) @@ -490,7 +490,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( } unsubscribeRelayCtx := utils.WithUniqueIdentifier(context.Background(), utils.GenerateUniqueIdentifier()) - err = cwsm.sendUnsubscribeMessage(unsubscribeRelayCtx, dappID, userIp, protocolMessage, metricsData) + err = cwsm.sendUnsubscribeMessage(unsubscribeRelayCtx, protocolMessage, metricsData) if err != nil { utils.LavaFormatError("could not send unsubscribe message due to a relay error", err, @@ -702,15 +702,16 @@ func (cwsm *ConsumerWSSubscriptionManager) craftUnsubscribeMessage(hashedParams, return protocolMessage, nil } -func (cwsm *ConsumerWSSubscriptionManager) sendUnsubscribeMessage(ctx context.Context, dappID, consumerIp string, protocolMessage ProtocolMessage, metricsData *metrics.RelayMetrics) error { +func (cwsm *ConsumerWSSubscriptionManager) sendUnsubscribeMessage(ctx context.Context, protocolMessage ProtocolMessage, metricsData *metrics.RelayMetrics) error { // Send the crafted unsubscribe relay + userData := protocolMessage.GetUserData() utils.LavaFormatTrace("sending unsubscribe relay", utils.LogAttr("GUID", ctx), - utils.LogAttr("dappID", dappID), - utils.LogAttr("consumerIp", consumerIp), + utils.LogAttr("dappID", userData.DappId), + utils.LogAttr("consumerIp", userData.ConsumerIp), ) - _, err := cwsm.relaySender.SendParsedRelay(ctx, dappID, consumerIp, metricsData, protocolMessage) + _, err := cwsm.relaySender.SendParsedRelay(ctx, metricsData, protocolMessage) if err != nil { return utils.LavaFormatError("could not send unsubscribe relay", err) } diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index 81a59fea87..c1a029ee93 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -67,7 +67,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes chainMessage1, err := chainParser.ParseMsg("", play.subscriptionRequestData1, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - protocolMessage1 := NewProtocolMessage(chainMessage1, nil, nil) + protocolMessage1 := NewProtocolMessage(chainMessage1, nil, nil, dapp, ip) relaySender := NewMockRelaySender(ctrl) relaySender. EXPECT(). @@ -129,7 +129,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). Return(relayResult1, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe @@ -224,7 +224,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { chainMessage1, err := chainParser.ParseMsg("", play.subscriptionRequestData1, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - protocolMessage1 := NewProtocolMessage(chainMessage1, nil, nil) + protocolMessage1 := NewProtocolMessage(chainMessage1, nil, nil, dapp, "") relaySender := NewMockRelaySender(ctrl) relaySender. EXPECT(). @@ -286,7 +286,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). Return(relayResult1, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe @@ -430,14 +430,15 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { subscribeChainMessage1, err := chainParser.ParseMsg("", play.subscriptionRequestData1, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - subscribeProtocolMessage1 := NewProtocolMessage(subscribeChainMessage1, nil, nil) + subscribeProtocolMessage1 := NewProtocolMessage(subscribeChainMessage1, nil, nil, dapp1, ts.Consumer.Addr.String()) unsubscribeProtocolMessage1 := NewProtocolMessage(unsubscribeChainMessage1, nil, &pairingtypes.RelayPrivateData{ Data: play.unsubscribeMessage1, - }) + }, dapp1, ts.Consumer.Addr.String(), + ) relaySender := NewMockRelaySender(ctrl) relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomockuber.Cond(func(x any) bool { + SendParsedRelay(gomock.Any(), gomock.Any(), gomockuber.Cond(func(x any) bool { protocolMsg, ok := x.(ProtocolMessage) require.True(t, ok) require.NotNil(t, protocolMsg) @@ -530,7 +531,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). Return(relayResult1, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe @@ -552,7 +553,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). Return(relayResult1, nil). Times(0) // Should not call SendParsedRelay, because it is already subscribed @@ -578,10 +579,10 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { // Prepare for the next subscription unsubscribeChainMessage2, err := chainParser.ParseMsg("", play.unsubscribeMessage2, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - unsubscribeProtocolMessage2 := NewProtocolMessage(unsubscribeChainMessage2, nil, &pairingtypes.RelayPrivateData{Data: play.unsubscribeMessage2}) + unsubscribeProtocolMessage2 := NewProtocolMessage(unsubscribeChainMessage2, nil, &pairingtypes.RelayPrivateData{Data: play.unsubscribeMessage2}, dapp2, ts.Consumer.Addr.String()) subscribeChainMessage2, err := chainParser.ParseMsg("", play.subscriptionRequestData2, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - subscribeProtocolMessage2 := NewProtocolMessage(subscribeChainMessage2, nil, nil) + subscribeProtocolMessage2 := NewProtocolMessage(subscribeChainMessage2, nil, nil, dapp2, ts.Consumer.Addr.String()) relaySender. EXPECT(). ParseRelay(gomock.Any(), gomock.Any(), gomockuber.Cond(func(x any) bool { @@ -644,7 +645,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). Return(relayResult2, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe @@ -664,12 +665,12 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { // Prepare for unsubscribe from the first subscription relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). Return(relayResult1, nil). Times(0) // Should call SendParsedRelay, because it unsubscribed ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - unsubProtocolMessage := NewProtocolMessage(unsubscribeChainMessage1, nil, relayResult1.Request.RelayData) + unsubProtocolMessage := NewProtocolMessage(unsubscribeChainMessage1, nil, relayResult1.Request.RelayData, dapp2, ts.Consumer.Addr.String()) err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, nil) require.NoError(t, err) @@ -688,7 +689,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { // Prepare for unsubscribe from the second subscription relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(func(ctx context.Context, dappID string, consumerIp string, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage) (relayResult *common.RelayResult, errRet error) { wg.Done() return relayResult2, nil diff --git a/protocol/chainlib/protocol_message.go b/protocol/chainlib/protocol_message.go index c9ed2ea01d..a1b695b848 100644 --- a/protocol/chainlib/protocol_message.go +++ b/protocol/chainlib/protocol_message.go @@ -7,10 +7,20 @@ import ( pairingtypes "github.com/lavanet/lava/v2/x/pairing/types" ) +type UserData struct { + ConsumerIp string + DappId string +} + type BaseProtocolMessage struct { ChainMessage directiveHeaders map[string]string relayRequestData *pairingtypes.RelayPrivateData + userData UserData +} + +func (bpm *BaseProtocolMessage) GetUserData() UserData { + return bpm.userData } func (bpm *BaseProtocolMessage) GetDirectiveHeaders() map[string]string { @@ -36,11 +46,12 @@ func (bpm *BaseProtocolMessage) GetBlockedProviders() []string { return nil } -func NewProtocolMessage(chainMessage ChainMessage, directiveHeaders map[string]string, relayRequestData *pairingtypes.RelayPrivateData) ProtocolMessage { +func NewProtocolMessage(chainMessage ChainMessage, directiveHeaders map[string]string, relayRequestData *pairingtypes.RelayPrivateData, dappId, consumerIp string) ProtocolMessage { return &BaseProtocolMessage{ ChainMessage: chainMessage, directiveHeaders: directiveHeaders, relayRequestData: relayRequestData, + userData: UserData{DappId: dappId, ConsumerIp: consumerIp}, } } @@ -50,4 +61,5 @@ type ProtocolMessage interface { RelayPrivateData() *pairingtypes.RelayPrivateData HashCacheRequest(chainId string) ([]byte, func([]byte) []byte, error) GetBlockedProviders() []string + GetUserData() UserData } diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index f18b276663..e14c6a0a8c 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -48,12 +48,11 @@ type RelayProcessor struct { protocolResponseErrors RelayErrors successResults []common.RelayResult lock sync.RWMutex - chainMessage chainlib.ChainMessage + protocolMessage chainlib.ProtocolMessage guid uint64 selection Selection consumerConsistency *ConsumerConsistency - dappID string - consumerIp string + userData chainlib.UserData skipDataReliability bool debugRelay bool allowSessionDegradation uint32 // used in the scenario where extension was previously used. @@ -67,10 +66,8 @@ func NewRelayProcessor( ctx context.Context, usedProviders *lavasession.UsedProviders, requiredSuccesses int, - chainMessage chainlib.ChainMessage, + protocolMessage chainlib.ProtocolMessage, consumerConsistency *ConsumerConsistency, - dappID string, - consumerIp string, debugRelay bool, metricsInf MetricsInterface, chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter, @@ -79,7 +76,7 @@ func NewRelayProcessor( ) *RelayProcessor { guid, _ := utils.GetUniqueIdentifier(ctx) selection := Quorum // select the majority of node responses - if chainlib.GetStateful(chainMessage) == common.CONSISTENCY_SELECT_ALL_PROVIDERS { + if chainlib.GetStateful(protocolMessage) == common.CONSISTENCY_SELECT_ALL_PROVIDERS { selection = BestResult // select the majority of node successes } if requiredSuccesses <= 0 { @@ -91,12 +88,10 @@ func NewRelayProcessor( responses: make(chan *relayResponse, MaxCallsPerRelay), // we set it as buffered so it is not blocking nodeResponseErrors: RelayErrors{relayErrors: []RelayError{}}, protocolResponseErrors: RelayErrors{relayErrors: []RelayError{}, onFailureMergeAll: true}, - chainMessage: chainMessage, + protocolMessage: protocolMessage, guid: guid, selection: selection, consumerConsistency: consumerConsistency, - dappID: dappID, - consumerIp: consumerIp, debugRelay: debugRelay, metricsInf: metricsInf, chainIdAndApiInterfaceGetter: chainIdAndApiInterfaceGetter, @@ -213,7 +208,7 @@ func (rp *RelayProcessor) setValidResponse(response *relayResponse) { // future relay requests and data reliability requests need to ask for the same specific block height to get consensus on the reply // we do not modify the chain message data on the consumer, only it's requested block, so we let the provider know it can't put any block height it wants by setting a specific block height - reqBlock, _ := rp.chainMessage.RequestedBlock() + reqBlock, _ := rp.protocolMessage.RequestedBlock() if reqBlock == spectypes.LATEST_BLOCK { // TODO: when we turn on dataReliability on latest call UpdateLatest, until then we turn it off always // modifiedOnLatestReq := rp.chainMessage.UpdateLatestBlockInMessage(response.relayResult.Reply.LatestBlock, false) @@ -235,15 +230,15 @@ func (rp *RelayProcessor) setValidResponse(response *relayResponse) { // no error, update the seen block blockSeen := response.relayResult.Reply.LatestBlock // nil safe - rp.consumerConsistency.SetSeenBlock(blockSeen, rp.dappID, rp.consumerIp) + rp.consumerConsistency.SetSeenBlock(blockSeen, rp.userData.DappId, rp.userData.ConsumerIp) // on subscribe results, we just append to successful results instead of parsing results because we already have a validation. - if chainlib.IsFunctionTagOfType(rp.chainMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) { + if chainlib.IsFunctionTagOfType(rp.protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) { rp.successResults = append(rp.successResults, response.relayResult) return } // check response error - foundError, errorMessage := rp.chainMessage.CheckResponseError(response.relayResult.Reply.Data, response.relayResult.StatusCode) + foundError, errorMessage := rp.protocolMessage.CheckResponseError(response.relayResult.Reply.Data, response.relayResult.StatusCode) if foundError { // this is a node error, meaning we still didn't get a good response. // we may choose to wait until there will be a response or timeout happens @@ -253,7 +248,7 @@ func (rp *RelayProcessor) setValidResponse(response *relayResponse) { // send relay error metrics only on non stateful queries, as stateful queries always return X-1/X errors. if rp.selection != BestResult { go rp.metricsInf.SetRelayNodeErrorMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()) - utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", err), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.chainMessage.GetApi().Name), utils.LogAttr("requested_block", reqBlock)) + utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", err), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.protocolMessage.GetApi().Name), utils.LogAttr("requested_block", reqBlock)) } return } @@ -305,7 +300,7 @@ func (rp *RelayProcessor) HasResults() bool { } func (rp *RelayProcessor) getInputMsgInfoHashString() (string, error) { - hash, err := rp.chainMessage.GetRawRequestHash() + hash, err := rp.protocolMessage.GetRawRequestHash() hashString := "" if err == nil { hashString = string(hash) @@ -336,8 +331,8 @@ func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, node // We failed enough times. we need to add this to our hash map so we don't waste time on it again. chainId, apiInterface := rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface() utils.LavaFormatWarning("Failed to recover retries on node errors, might be an invalid input", nil, - utils.LogAttr("api", rp.chainMessage.GetApi().Name), - utils.LogAttr("params", rp.chainMessage.GetRPCMessage().GetParams()), + utils.LogAttr("api", rp.protocolMessage.GetApi().Name), + utils.LogAttr("params", rp.protocolMessage.GetRPCMessage().GetParams()), utils.LogAttr("chainId", chainId), utils.LogAttr("apiInterface", apiInterface), utils.LogAttr("hash", hash), @@ -436,7 +431,7 @@ func (rp *RelayProcessor) responsesQuorum(results []common.RelayResult, quorumSi return nil, errors.New("quorumSize must be greater than zero") } countMap := make(map[string]int) // Map to store the count of each unique result.Reply.Data - deterministic := rp.chainMessage.GetApi().Category.Deterministic + deterministic := rp.protocolMessage.GetApi().Category.Deterministic var bestQosResult common.RelayResult bestQos := sdktypes.ZeroDec() nilReplies := 0 diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index bb21a8eda3..ee77131544 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -104,7 +104,8 @@ func TestRelayProcessorHappyFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -146,7 +147,8 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -188,7 +190,8 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // check hash map flow: chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage = chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -212,7 +215,8 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // check hash map flow: chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/18", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage = chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -236,7 +240,8 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // 4th relay, same inputs, this time a successful relay, should remove the hash from the map chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage = chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -285,7 +290,8 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) relayProcessor.disableRelayRetry = true usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -323,7 +329,8 @@ func TestRelayProcessorTimeout(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -375,7 +382,8 @@ func TestRelayProcessorRetry(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -419,7 +427,8 @@ func TestRelayProcessorRetryNodeError(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -464,7 +473,8 @@ func TestRelayProcessorStatefulApi(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -509,7 +519,8 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -553,7 +564,8 @@ func TestRelayProcessorLatest(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/latest", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 595038b5a7..1a8e4d8d71 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -42,6 +42,8 @@ const ( MaxRelayRetries = 6 SendRelayAttempts = 3 numberOfTimesToCheckCurrentlyUsedIsEmpty = 3 + initRelaysDappId = "-init-" + initRelaysConsumerIp = "" ) var NoResponseTimeout = sdkerrors.New("NoResponseTimeout Error", 685, "timeout occurred while waiting for providers responses") @@ -226,13 +228,13 @@ func (rpccs *RPCConsumerServer) craftRelay(ctx context.Context) (ok bool, relay func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retries int, initialRelays bool, protocolMessage chainlib.ProtocolMessage) (bool, error) { success := false var err error - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) for i := 0; i < retries; i++ { - err = rpccs.sendRelayToProvider(ctx, protocolMessage, "-init-", "", relayProcessor, nil) + err = rpccs.sendRelayToProvider(ctx, protocolMessage, relayProcessor, nil) if lavasession.PairingListEmptyError.Is(err) { // we don't have pairings anymore, could be related to unwanted providers relayProcessor.GetUsedProviders().ClearUnwanted() - err = rpccs.sendRelayToProvider(ctx, protocolMessage, "-init-", "", relayProcessor, nil) + err = rpccs.sendRelayToProvider(ctx, protocolMessage, relayProcessor, nil) } if err != nil { utils.LavaFormatError("[-] failed sending init relay", err, []utils.Attribute{{Key: "chainID", Value: rpccs.listenEndpoint.ChainID}, {Key: "APIInterface", Value: rpccs.listenEndpoint.ApiInterface}, {Key: "relayProcessor", Value: relayProcessor}}...) @@ -285,7 +287,7 @@ func (rpccs *RPCConsumerServer) sendCraftedRelays(retries int, initialRelays boo } return false, err } - protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relay) + protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relay, initRelaysDappId, initRelaysConsumerIp) return rpccs.sendRelayWithRetries(ctx, retries, initialRelays, protocolMessage) } @@ -313,7 +315,7 @@ func (rpccs *RPCConsumerServer) SendRelay( return nil, err } - return rpccs.SendParsedRelay(ctx, dappID, consumerIp, analytics, protocolMessage) + return rpccs.SendParsedRelay(ctx, analytics, protocolMessage) } func (rpccs *RPCConsumerServer) ParseRelay( @@ -347,14 +349,12 @@ func (rpccs *RPCConsumerServer) ParseRelay( } relayRequestData := lavaprotocol.NewRelayData(ctx, connectionType, url, []byte(req), seenBlock, reqBlock, rpccs.listenEndpoint.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), chainlib.GetAddon(chainMessage), common.GetExtensionNames(chainMessage.GetExtensions())) - protocolMessage = chainlib.NewProtocolMessage(chainMessage, directiveHeaders, relayRequestData) + protocolMessage = chainlib.NewProtocolMessage(chainMessage, directiveHeaders, relayRequestData, dappID, consumerIp) return protocolMessage, nil } func (rpccs *RPCConsumerServer) SendParsedRelay( ctx context.Context, - dappID string, - consumerIp string, analytics *metrics.RelayMetrics, protocolMessage chainlib.ProtocolMessage, ) (relayResult *common.RelayResult, errRet error) { @@ -364,10 +364,11 @@ func (rpccs *RPCConsumerServer) SendParsedRelay( // asynchronously sends data reliability if necessary relaySentTime := time.Now() - relayProcessor, err := rpccs.ProcessRelaySend(ctx, protocolMessage, dappID, consumerIp, analytics) + relayProcessor, err := rpccs.ProcessRelaySend(ctx, protocolMessage, analytics) if err != nil && !relayProcessor.HasResults() { + userData := protocolMessage.GetUserData() // we can't send anymore, and we don't have any responses - utils.LavaFormatError("failed getting responses from providers", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.LogAttr("endpoint", rpccs.listenEndpoint.Key()), utils.LogAttr("userIp", consumerIp), utils.LogAttr("relayProcessor", relayProcessor)) + utils.LavaFormatError("failed getting responses from providers", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.LogAttr("endpoint", rpccs.listenEndpoint.Key()), utils.LogAttr("userIp", userData.ConsumerIp), utils.LogAttr("relayProcessor", relayProcessor)) return nil, err } @@ -382,7 +383,7 @@ func (rpccs *RPCConsumerServer) SendParsedRelay( if found { dataReliabilityContext = utils.WithUniqueIdentifier(dataReliabilityContext, guid) } - go rpccs.sendDataReliabilityRelayIfApplicable(dataReliabilityContext, dappID, consumerIp, protocolMessage, dataReliabilityThreshold, relayProcessor) // runs asynchronously + go rpccs.sendDataReliabilityRelayIfApplicable(dataReliabilityContext, protocolMessage, dataReliabilityThreshold, relayProcessor) // runs asynchronously } returnedResult, err := relayProcessor.ProcessingResult() @@ -406,11 +407,11 @@ func (rpccs *RPCConsumerServer) GetChainIdAndApiInterface() (string, string) { return rpccs.listenEndpoint.ChainID, rpccs.listenEndpoint.ApiInterface } -func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMessage chainlib.ProtocolMessage, dappID string, consumerIp string, analytics *metrics.RelayMetrics) (*RelayProcessor, error) { +func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMessage chainlib.ProtocolMessage, analytics *metrics.RelayMetrics) (*RelayProcessor, error) { // make sure all of the child contexts are cancelled when we exit ctx, cancel := context.WithCancel(ctx) defer cancel() - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) var err error // try sending a relay 3 times. if failed return the error for retryFirstRelayAttempt := 0; retryFirstRelayAttempt < SendRelayAttempts; retryFirstRelayAttempt++ { @@ -418,7 +419,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe if analytics != nil && retryFirstRelayAttempt > 0 { analytics = nil } - err = rpccs.sendRelayToProvider(ctx, protocolMessage, dappID, consumerIp, relayProcessor, analytics) + err = rpccs.sendRelayToProvider(ctx, protocolMessage, relayProcessor, analytics) // check if we had an error. if we did, try again. if err == nil { @@ -485,7 +486,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe return relayProcessor, nil } // otherwise continue sending another relay - err := rpccs.sendRelayToProvider(processingCtx, protocolMessage, dappID, consumerIp, relayProcessor, nil) + err := rpccs.sendRelayToProvider(processingCtx, protocolMessage, relayProcessor, nil) go validateReturnCondition(err) go readResultsFromProcessor() // increase number of retries launched only if we still have pairing available, if we exhausted the list we don't want to break early @@ -498,7 +499,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe if relayProcessor.ShouldRetry(numberOfRetriesLaunched) { // limit the number of retries called from the new batch ticker flow. // if we pass the limit we just wait for the relays we sent to return. - err := rpccs.sendRelayToProvider(processingCtx, protocolMessage, dappID, consumerIp, relayProcessor, nil) + err := rpccs.sendRelayToProvider(processingCtx, protocolMessage, relayProcessor, nil) go validateReturnCondition(err) // add ticker launch metrics go rpccs.rpcConsumerLogs.SetRelaySentByNewBatchTickerMetric(rpccs.GetChainIdAndApiInterface()) @@ -521,8 +522,8 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe // in case we got a processing timeout we return context deadline exceeded to the user. utils.LavaFormatWarning("Relay Got processingCtx timeout", nil, utils.LogAttr("processingTimeout", processingTimeout), - utils.LogAttr("dappId", dappID), - utils.LogAttr("consumerIp", consumerIp), + utils.LogAttr("dappId", relayProcessor.userData.DappId), + utils.LogAttr("consumerIp", relayProcessor.userData.ConsumerIp), utils.LogAttr("protocolMessage.GetApi().Name", protocolMessage.GetApi().Name), utils.LogAttr("GUID", ctx), utils.LogAttr("relayProcessor", relayProcessor), @@ -553,8 +554,6 @@ func (rpccs *RPCConsumerServer) CancelSubscriptionContext(subscriptionKey string func (rpccs *RPCConsumerServer) sendRelayToProvider( ctx context.Context, protocolMessage chainlib.ProtocolMessage, - dappID string, - consumerIp string, relayProcessor *RelayProcessor, analytics *metrics.RelayMetrics, ) (errRet error) { @@ -569,9 +568,10 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( // if necessary send detection tx for hashes consensus mismatch // handle QoS updates // in case connection totally fails, update unresponsive providers in ConsumerSessionManager + userData := protocolMessage.GetUserData() var sharedStateId string // defaults to "", if shared state is disabled then no shared state will be used. if rpccs.sharedState { - sharedStateId = rpccs.consumerConsistency.Key(dappID, consumerIp) // use same key as we use for consistency, (for better consistency :-D) + sharedStateId = rpccs.consumerConsistency.Key(userData.DappId, userData.ConsumerIp) // use same key as we use for consistency, (for better consistency :-D) } privKey := rpccs.privKey @@ -612,7 +612,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( utils.LavaFormatDebug("shared state seen block is newer", utils.LogAttr("cache_seen_block", cacheSeenBlock), utils.LogAttr("local_seen_block", protocolMessage.RelayPrivateData().SeenBlock)) protocolMessage.RelayPrivateData().SeenBlock = cacheSeenBlock // setting the fetched seen block from the cache server to our local cache as well. - rpccs.consumerConsistency.SetSeenBlock(cacheSeenBlock, dappID, consumerIp) + rpccs.consumerConsistency.SetSeenBlock(cacheSeenBlock, userData.DappId, userData.ConsumerIp) } // handle cache reply @@ -659,7 +659,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( if err != nil { if lavasession.PairingListEmptyError.Is(err) { if addon != "" { - return utils.LavaFormatError("No Providers For Addon", err, utils.LogAttr("addon", addon), utils.LogAttr("extensions", extensions), utils.LogAttr("userIp", consumerIp)) + return utils.LavaFormatError("No Providers For Addon", err, utils.LogAttr("addon", addon), utils.LogAttr("extensions", extensions), utils.LogAttr("userIp", userData.ConsumerIp)) } else if len(extensions) > 0 && relayProcessor.GetAllowSessionDegradation() { // if we have no providers for that extension, use a regular provider, otherwise return the extension results sessions, err = rpccs.consumerSessionManager.GetSessions(ctx, chainlib.GetComputeUnits(protocolMessage), usedProviders, reqBlock, addon, []*spectypes.Extension{}, chainlib.GetStateful(protocolMessage), virtualEpoch) if err != nil { @@ -778,7 +778,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( } // unique per dappId and ip - consumerToken := common.GetUniqueToken(dappID, consumerIp) + consumerToken := common.GetUniqueToken(userData.DappId, userData.ConsumerIp) processingTimeout, expectedRelayTimeoutForQOS := rpccs.getProcessingTimeout(protocolMessage) deadline, ok := ctx.Deadline() if ok { // we have ctx deadline. we cant go past it. @@ -1191,7 +1191,7 @@ func (rpccs *RPCConsumerServer) getFirstSubscriptionReply(ctx context.Context, h return &reply, nil } -func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context.Context, dappID string, consumerIp string, chainMessage chainlib.ChainMessage, dataReliabilityThreshold uint32, relayProcessor *RelayProcessor) error { +func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context.Context, chainMessage chainlib.ProtocolMessage, dataReliabilityThreshold uint32, relayProcessor *RelayProcessor) error { processingTimeout, expectedRelayTimeout := rpccs.getProcessingTimeout(chainMessage) // Wait another relayTimeout duration to maybe get additional relay results if relayProcessor.usedProviders.CurrentlyUsed() > 0 { @@ -1231,10 +1231,11 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context } relayResult := results[0] if len(results) < 2 { + userData := chainMessage.GetUserData() relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions) - protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relayRequestData) - relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) - err := rpccs.sendRelayToProvider(ctx, protocolMessage, dappID, consumerIp, relayProcessorDataReliability, nil) + protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relayRequestData, userData.DappId, userData.ConsumerIp) + relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + err := rpccs.sendRelayToProvider(ctx, protocolMessage, relayProcessorDataReliability, nil) if err != nil { return utils.LavaFormatWarning("failed data reliability relay to provider", err, utils.LogAttr("relayProcessorDataReliability", relayProcessorDataReliability)) } From c2dff9ce69a827b3b4cca247faf3e5a1bf22e104 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 22 Aug 2024 18:21:25 +0200 Subject: [PATCH 02/17] fix test --- protocol/chainlib/consumer_ws_subscription_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index c1a029ee93..dab816b378 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -690,7 +690,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { relaySender. EXPECT(). SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(func(ctx context.Context, dappID string, consumerIp string, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage) (relayResult *common.RelayResult, errRet error) { + DoAndReturn(func(ctx context.Context, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage) (relayResult *common.RelayResult, errRet error) { wg.Done() return relayResult2, nil }). From bc2afb71bff2e5089e841758c688a6ebd5e1839f Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Mon, 26 Aug 2024 11:05:43 +0200 Subject: [PATCH 03/17] feat: PRT - adding relay processor retry options --- protocol/common/cobra_common.go | 6 +++- protocol/rpcconsumer/relay_processor.go | 19 ++++++++---- protocol/rpcconsumer/relay_processor_test.go | 31 ++++++++++++-------- protocol/rpcconsumer/rpcconsumer.go | 5 ++++ protocol/rpcconsumer/rpcconsumer_server.go | 14 +++++---- 5 files changed, 50 insertions(+), 25 deletions(-) diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index d7daa95e59..389e323f08 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -32,6 +32,8 @@ const ( // Disable relay retries when we get node errors. // This feature is suppose to help with successful relays in some chains that return node errors on rare race conditions on the serviced chains. DisableRetryOnNodeErrorsFlag = "disable-retry-on-node-error" + SetRelayCountOnNodeErrorFlag = "set-retry-count-on-node-error" + DisableCacheOnNodeErrorFlag = "disable-cache-on-node-error" UseOfflineSpecFlag = "use-offline-spec" // allows the user to manually load a spec providing a path, this is useful to test spec changes before they hit the blockchain ) @@ -56,7 +58,9 @@ type ConsumerCmdFlags struct { DebugRelays bool // enables debug mode for relays DisableConflictTransactions bool // disable conflict transactions DisableRetryOnNodeErrors bool // disable retries on node errors - OfflineSpecPath string // path to the spec file, works only when bootstrapping a single chain. + SetRelayCountOnNodeError int + DisableCacheOnNodeError bool + OfflineSpecPath string // path to the spec file, works only when bootstrapping a single chain. } // default rolling logs behavior (if enabled) will store 3 files each 100MB for up to 1 day every time. diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index f18b276663..8f671f90ed 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -59,8 +59,14 @@ type RelayProcessor struct { allowSessionDegradation uint32 // used in the scenario where extension was previously used. metricsInf MetricsInterface chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter - disableRelayRetry bool relayRetriesManager *RelayRetriesManager + retryOptions retryProcessorOptions +} + +type retryProcessorOptions struct { + relayCountOnNodeError int + disableCacheOnNodeError bool + disableRelayRetry bool } func NewRelayProcessor( @@ -74,7 +80,7 @@ func NewRelayProcessor( debugRelay bool, metricsInf MetricsInterface, chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter, - disableRelayRetry bool, + retryOptions retryProcessorOptions, relayRetriesManager *RelayRetriesManager, ) *RelayProcessor { guid, _ := utils.GetUniqueIdentifier(ctx) @@ -100,7 +106,7 @@ func NewRelayProcessor( debugRelay: debugRelay, metricsInf: metricsInf, chainIdAndApiInterfaceGetter: chainIdAndApiInterfaceGetter, - disableRelayRetry: disableRelayRetry, + retryOptions: retryOptions, relayRetriesManager: relayRetriesManager, } } @@ -320,12 +326,13 @@ func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, node // 2. If we have 0 successful relays and we have only node errors. // 3. Hash calculation was successful. // 4. Number of retries < NumberOfRetriesAllowedOnNodeErrors. - if !rp.disableRelayRetry && resultsCount == 0 && hashErr == nil { - if nodeErrors <= NumberOfRetriesAllowedOnNodeErrors { + if !rp.retryOptions.disableRelayRetry && resultsCount == 0 && hashErr == nil { + if nodeErrors <= rp.retryOptions.relayCountOnNodeError { // TODO: check chain message retry on archive. (this feature will be added in the generic parsers feature) + // Check if user specified to disable caching - OR // Check hash already exist, if it does, we don't want to retry - if !rp.relayRetriesManager.CheckHashInCache(hash) { + if rp.retryOptions.disableCacheOnNodeError || !rp.relayRetriesManager.CheckHashInCache(hash) { // If we didn't find the hash in the hash map we can retry utils.LavaFormatTrace("retrying on relay error", utils.LogAttr("retry_number", nodeErrors), utils.LogAttr("hash", hash)) go rp.metricsInf.SetNodeErrorAttemptMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()) diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index bb21a8eda3..70ee97431b 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -16,6 +16,12 @@ import ( "github.com/stretchr/testify/require" ) +var retryOptionsTest = retryProcessorOptions{ + disableRelayRetry: false, + relayCountOnNodeError: 2, + disableCacheOnNodeError: false, +} + type relayProcessorMetricsMock struct{} func (romm *relayProcessorMetricsMock) SetRelayNodeErrorMetric(chainId string, apiInterface string) {} @@ -104,7 +110,7 @@ func TestRelayProcessorHappyFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -146,7 +152,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -188,7 +194,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // check hash map flow: chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -212,7 +218,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // check hash map flow: chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/18", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -236,7 +242,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // 4th relay, same inputs, this time a successful relay, should remove the hash from the map chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -285,8 +291,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) - relayProcessor.disableRelayRetry = true + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -323,7 +328,7 @@ func TestRelayProcessorTimeout(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -375,7 +380,7 @@ func TestRelayProcessorRetry(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -419,7 +424,7 @@ func TestRelayProcessorRetryNodeError(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -464,7 +469,7 @@ func TestRelayProcessorStatefulApi(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -509,7 +514,7 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -553,7 +558,7 @@ func TestRelayProcessorLatest(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/latest", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index c5b5a32e5b..94fc853b6b 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -584,6 +584,8 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 DebugRelays: viper.GetBool(DebugRelaysFlagName), DisableConflictTransactions: viper.GetBool(common.DisableConflictTransactionsFlag), DisableRetryOnNodeErrors: viper.GetBool(common.DisableRetryOnNodeErrorsFlag), + SetRelayCountOnNodeError: viper.GetInt(common.SetRelayCountOnNodeErrorFlag), + DisableCacheOnNodeError: viper.GetBool(common.DisableCacheOnNodeErrorFlag), OfflineSpecPath: viper.GetString(common.UseOfflineSpecFlag), } @@ -634,6 +636,9 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().Bool(common.DisableRetryOnNodeErrorsFlag, false, "Disable relay retries on node errors, prevent the rpcconsumer trying a different provider") cmdRPCConsumer.Flags().String(common.UseOfflineSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain") + cmdRPCConsumer.Flags().Int(common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") + cmdRPCConsumer.Flags().Bool(common.DisableCacheOnNodeErrorFlag, false, "cancel the use of cache to block retry attempts that failed in the past, this will cause every node error to send multiple relays for ever including spam") + common.AddRollingLogConfig(cmdRPCConsumer) return cmdRPCConsumer } diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 595038b5a7..1c7ede5a53 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -73,7 +73,7 @@ type RPCConsumerServer struct { connectedSubscriptionsContexts map[string]*CancelableContextHolder chainListener chainlib.ChainListener connectedSubscriptionsLock sync.RWMutex - disableNodeErrorRetry bool + retryOptions retryProcessorOptions relayRetriesManager *RelayRetriesManager } @@ -124,7 +124,11 @@ func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndp rpccs.debugRelays = cmdFlags.DebugRelays rpccs.connectedSubscriptionsContexts = make(map[string]*CancelableContextHolder) rpccs.consumerProcessGuid = strconv.FormatUint(utils.GenerateUniqueIdentifier(), 10) - rpccs.disableNodeErrorRetry = cmdFlags.DisableRetryOnNodeErrors + rpccs.retryOptions = retryProcessorOptions{ + disableRelayRetry: cmdFlags.DisableRetryOnNodeErrors, + relayCountOnNodeError: cmdFlags.SetRelayCountOnNodeError, + disableCacheOnNodeError: cmdFlags.DisableCacheOnNodeError, + } rpccs.relayRetriesManager = NewRelayRetriesManager() rpccs.chainListener, err = chainlib.NewChainListener(ctx, listenEndpoint, rpccs, rpccs, rpcConsumerLogs, chainParser, refererData, consumerWsSubscriptionManager) if err != nil { @@ -226,7 +230,7 @@ func (rpccs *RPCConsumerServer) craftRelay(ctx context.Context) (ok bool, relay func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retries int, initialRelays bool, protocolMessage chainlib.ProtocolMessage) (bool, error) { success := false var err error - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.retryOptions, rpccs.relayRetriesManager) for i := 0; i < retries; i++ { err = rpccs.sendRelayToProvider(ctx, protocolMessage, "-init-", "", relayProcessor, nil) if lavasession.PairingListEmptyError.Is(err) { @@ -410,7 +414,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe // make sure all of the child contexts are cancelled when we exit ctx, cancel := context.WithCancel(ctx) defer cancel() - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.retryOptions, rpccs.relayRetriesManager) var err error // try sending a relay 3 times. if failed return the error for retryFirstRelayAttempt := 0; retryFirstRelayAttempt < SendRelayAttempts; retryFirstRelayAttempt++ { @@ -1233,7 +1237,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context if len(results) < 2 { relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions) protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relayRequestData) - relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.retryOptions, rpccs.relayRetriesManager) err := rpccs.sendRelayToProvider(ctx, protocolMessage, dappID, consumerIp, relayProcessorDataReliability, nil) if err != nil { return utils.LavaFormatWarning("failed data reliability relay to provider", err, utils.LogAttr("relayProcessorDataReliability", relayProcessorDataReliability)) From a2f8eef449dab9efad5f8fde6eb57619e925b841 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Mon, 26 Aug 2024 11:06:07 +0200 Subject: [PATCH 04/17] rename --- protocol/rpcconsumer/relay_processor.go | 6 +++--- protocol/rpcconsumer/relay_processor_test.go | 2 +- protocol/rpcconsumer/rpcconsumer_server.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index 8f671f90ed..799d84caba 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -60,10 +60,10 @@ type RelayProcessor struct { metricsInf MetricsInterface chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter relayRetriesManager *RelayRetriesManager - retryOptions retryProcessorOptions + retryOptions relayProcessorRetryOptions } -type retryProcessorOptions struct { +type relayProcessorRetryOptions struct { relayCountOnNodeError int disableCacheOnNodeError bool disableRelayRetry bool @@ -80,7 +80,7 @@ func NewRelayProcessor( debugRelay bool, metricsInf MetricsInterface, chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter, - retryOptions retryProcessorOptions, + retryOptions relayProcessorRetryOptions, relayRetriesManager *RelayRetriesManager, ) *RelayProcessor { guid, _ := utils.GetUniqueIdentifier(ctx) diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index 70ee97431b..44822f1dbe 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -16,7 +16,7 @@ import ( "github.com/stretchr/testify/require" ) -var retryOptionsTest = retryProcessorOptions{ +var retryOptionsTest = relayProcessorRetryOptions{ disableRelayRetry: false, relayCountOnNodeError: 2, disableCacheOnNodeError: false, diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 1c7ede5a53..a5fe92ff6f 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -73,7 +73,7 @@ type RPCConsumerServer struct { connectedSubscriptionsContexts map[string]*CancelableContextHolder chainListener chainlib.ChainListener connectedSubscriptionsLock sync.RWMutex - retryOptions retryProcessorOptions + retryOptions relayProcessorRetryOptions relayRetriesManager *RelayRetriesManager } @@ -124,7 +124,7 @@ func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndp rpccs.debugRelays = cmdFlags.DebugRelays rpccs.connectedSubscriptionsContexts = make(map[string]*CancelableContextHolder) rpccs.consumerProcessGuid = strconv.FormatUint(utils.GenerateUniqueIdentifier(), 10) - rpccs.retryOptions = retryProcessorOptions{ + rpccs.retryOptions = relayProcessorRetryOptions{ disableRelayRetry: cmdFlags.DisableRetryOnNodeErrors, relayCountOnNodeError: cmdFlags.SetRelayCountOnNodeError, disableCacheOnNodeError: cmdFlags.DisableCacheOnNodeError, From 35e5e9b8d8176b2276df1bd5d90cdbc2e79ec491 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Mon, 26 Aug 2024 17:55:12 +0200 Subject: [PATCH 05/17] improving the features --- protocol/common/cobra_common.go | 5 +--- protocol/rpcconsumer/relay_processor.go | 30 +++++++------------- protocol/rpcconsumer/relay_processor_test.go | 30 ++++++++------------ protocol/rpcconsumer/rpcconsumer.go | 11 ++++--- protocol/rpcconsumer/rpcconsumer_server.go | 12 ++------ 5 files changed, 32 insertions(+), 56 deletions(-) diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index 389e323f08..c4fbcc5ea0 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -57,10 +57,7 @@ type ConsumerCmdFlags struct { RelaysHealthIntervalFlag time.Duration // interval for relay health check DebugRelays bool // enables debug mode for relays DisableConflictTransactions bool // disable conflict transactions - DisableRetryOnNodeErrors bool // disable retries on node errors - SetRelayCountOnNodeError int - DisableCacheOnNodeError bool - OfflineSpecPath string // path to the spec file, works only when bootstrapping a single chain. + OfflineSpecPath string // path to the spec file, works only when bootstrapping a single chain. } // default rolling logs behavior (if enabled) will store 3 files each 100MB for up to 1 day every time. diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index 799d84caba..f6d50ba5b1 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -18,16 +18,17 @@ import ( spectypes "github.com/lavanet/lava/v2/x/spec/types" ) -const ( - MaxCallsPerRelay = 50 - NumberOfRetriesAllowedOnNodeErrors = 2 // we will try maximum additional 2 relays on node errors -) - type Selection int const ( - Quorum Selection = iota // get the majority out of requiredSuccesses - BestResult // get the best result, even if it means waiting + Quorum Selection = iota // get the majority out of requiredSuccesses + BestResult // get the best result, even if it means waiting + MaxCallsPerRelay = 50 +) + +var ( + relayCountOnNodeError = 2 + disableRelayRetry = false ) type MetricsInterface interface { @@ -60,13 +61,6 @@ type RelayProcessor struct { metricsInf MetricsInterface chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter relayRetriesManager *RelayRetriesManager - retryOptions relayProcessorRetryOptions -} - -type relayProcessorRetryOptions struct { - relayCountOnNodeError int - disableCacheOnNodeError bool - disableRelayRetry bool } func NewRelayProcessor( @@ -80,7 +74,6 @@ func NewRelayProcessor( debugRelay bool, metricsInf MetricsInterface, chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter, - retryOptions relayProcessorRetryOptions, relayRetriesManager *RelayRetriesManager, ) *RelayProcessor { guid, _ := utils.GetUniqueIdentifier(ctx) @@ -106,7 +99,6 @@ func NewRelayProcessor( debugRelay: debugRelay, metricsInf: metricsInf, chainIdAndApiInterfaceGetter: chainIdAndApiInterfaceGetter, - retryOptions: retryOptions, relayRetriesManager: relayRetriesManager, } } @@ -326,13 +318,13 @@ func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, node // 2. If we have 0 successful relays and we have only node errors. // 3. Hash calculation was successful. // 4. Number of retries < NumberOfRetriesAllowedOnNodeErrors. - if !rp.retryOptions.disableRelayRetry && resultsCount == 0 && hashErr == nil { - if nodeErrors <= rp.retryOptions.relayCountOnNodeError { + if !disableRelayRetry && resultsCount == 0 && hashErr == nil { + if nodeErrors <= relayCountOnNodeError { // TODO: check chain message retry on archive. (this feature will be added in the generic parsers feature) // Check if user specified to disable caching - OR // Check hash already exist, if it does, we don't want to retry - if rp.retryOptions.disableCacheOnNodeError || !rp.relayRetriesManager.CheckHashInCache(hash) { + if !rp.relayRetriesManager.CheckHashInCache(hash) { // If we didn't find the hash in the hash map we can retry utils.LavaFormatTrace("retrying on relay error", utils.LogAttr("retry_number", nodeErrors), utils.LogAttr("hash", hash)) go rp.metricsInf.SetNodeErrorAttemptMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()) diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index 44822f1dbe..0b7d25fd66 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -16,12 +16,6 @@ import ( "github.com/stretchr/testify/require" ) -var retryOptionsTest = relayProcessorRetryOptions{ - disableRelayRetry: false, - relayCountOnNodeError: 2, - disableCacheOnNodeError: false, -} - type relayProcessorMetricsMock struct{} func (romm *relayProcessorMetricsMock) SetRelayNodeErrorMetric(chainId string, apiInterface string) {} @@ -110,7 +104,7 @@ func TestRelayProcessorHappyFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -152,7 +146,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -194,7 +188,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // check hash map flow: chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -218,7 +212,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // check hash map flow: chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/18", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -242,7 +236,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // 4th relay, same inputs, this time a successful relay, should remove the hash from the map chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -291,7 +285,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -328,7 +322,7 @@ func TestRelayProcessorTimeout(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -380,7 +374,7 @@ func TestRelayProcessorRetry(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -424,7 +418,7 @@ func TestRelayProcessorRetryNodeError(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -469,7 +463,7 @@ func TestRelayProcessorStatefulApi(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -514,7 +508,7 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -558,7 +552,7 @@ func TestRelayProcessorLatest(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/latest", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index 94fc853b6b..c011f4f1eb 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -583,12 +583,13 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 RelaysHealthIntervalFlag: viper.GetDuration(common.RelayHealthIntervalFlag), DebugRelays: viper.GetBool(DebugRelaysFlagName), DisableConflictTransactions: viper.GetBool(common.DisableConflictTransactionsFlag), - DisableRetryOnNodeErrors: viper.GetBool(common.DisableRetryOnNodeErrorsFlag), - SetRelayCountOnNodeError: viper.GetInt(common.SetRelayCountOnNodeErrorFlag), - DisableCacheOnNodeError: viper.GetBool(common.DisableCacheOnNodeErrorFlag), OfflineSpecPath: viper.GetString(common.UseOfflineSpecFlag), } + // set relay processor's global params + relayCountOnNodeError = viper.GetInt(common.SetRelayCountOnNodeErrorFlag) + disableRelayRetry = viper.GetBool(common.DisableRetryOnNodeErrorsFlag) + // validate user is does not provide multi chain setup when using the offline spec feature. if consumerPropagatedFlags.OfflineSpecPath != "" && len(rpcEndpoints) > 1 { utils.LavaFormatFatal("offline spec modifications are supported only in single chain bootstrapping", nil, utils.LogAttr("len(rpcEndpoints)", len(rpcEndpoints)), utils.LogAttr("rpcEndpoints", rpcEndpoints)) @@ -633,11 +634,9 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().BoolVar(&lavasession.DebugProbes, DebugProbesFlagName, false, "adding information to probes") cmdRPCConsumer.Flags().Bool(common.DisableConflictTransactionsFlag, false, "disabling conflict transactions, this flag should not be used as it harms the network's data reliability and therefore the service.") cmdRPCConsumer.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks") - cmdRPCConsumer.Flags().Bool(common.DisableRetryOnNodeErrorsFlag, false, "Disable relay retries on node errors, prevent the rpcconsumer trying a different provider") cmdRPCConsumer.Flags().String(common.UseOfflineSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain") - + cmdRPCConsumer.Flags().Bool(common.DisableRetryOnNodeErrorsFlag, false, "Disable relay retries on node errors, prevent the rpcconsumer trying a different provider") cmdRPCConsumer.Flags().Int(common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") - cmdRPCConsumer.Flags().Bool(common.DisableCacheOnNodeErrorFlag, false, "cancel the use of cache to block retry attempts that failed in the past, this will cause every node error to send multiple relays for ever including spam") common.AddRollingLogConfig(cmdRPCConsumer) return cmdRPCConsumer diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index a5fe92ff6f..bb8a409a69 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -73,7 +73,6 @@ type RPCConsumerServer struct { connectedSubscriptionsContexts map[string]*CancelableContextHolder chainListener chainlib.ChainListener connectedSubscriptionsLock sync.RWMutex - retryOptions relayProcessorRetryOptions relayRetriesManager *RelayRetriesManager } @@ -124,11 +123,6 @@ func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndp rpccs.debugRelays = cmdFlags.DebugRelays rpccs.connectedSubscriptionsContexts = make(map[string]*CancelableContextHolder) rpccs.consumerProcessGuid = strconv.FormatUint(utils.GenerateUniqueIdentifier(), 10) - rpccs.retryOptions = relayProcessorRetryOptions{ - disableRelayRetry: cmdFlags.DisableRetryOnNodeErrors, - relayCountOnNodeError: cmdFlags.SetRelayCountOnNodeError, - disableCacheOnNodeError: cmdFlags.DisableCacheOnNodeError, - } rpccs.relayRetriesManager = NewRelayRetriesManager() rpccs.chainListener, err = chainlib.NewChainListener(ctx, listenEndpoint, rpccs, rpccs, rpcConsumerLogs, chainParser, refererData, consumerWsSubscriptionManager) if err != nil { @@ -230,7 +224,7 @@ func (rpccs *RPCConsumerServer) craftRelay(ctx context.Context) (ok bool, relay func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retries int, initialRelays bool, protocolMessage chainlib.ProtocolMessage) (bool, error) { success := false var err error - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.retryOptions, rpccs.relayRetriesManager) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.relayRetriesManager) for i := 0; i < retries; i++ { err = rpccs.sendRelayToProvider(ctx, protocolMessage, "-init-", "", relayProcessor, nil) if lavasession.PairingListEmptyError.Is(err) { @@ -414,7 +408,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe // make sure all of the child contexts are cancelled when we exit ctx, cancel := context.WithCancel(ctx) defer cancel() - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.retryOptions, rpccs.relayRetriesManager) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.relayRetriesManager) var err error // try sending a relay 3 times. if failed return the error for retryFirstRelayAttempt := 0; retryFirstRelayAttempt < SendRelayAttempts; retryFirstRelayAttempt++ { @@ -1237,7 +1231,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context if len(results) < 2 { relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions) protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relayRequestData) - relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.retryOptions, rpccs.relayRetriesManager) + relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.relayRetriesManager) err := rpccs.sendRelayToProvider(ctx, protocolMessage, dappID, consumerIp, relayProcessorDataReliability, nil) if err != nil { return utils.LavaFormatWarning("failed data reliability relay to provider", err, utils.LogAttr("relayProcessorDataReliability", relayProcessorDataReliability)) From 25fd0f01241feef10560056aea91ef09f7fd10f5 Mon Sep 17 00:00:00 2001 From: omerlavanet Date: Mon, 26 Aug 2024 19:30:12 +0300 Subject: [PATCH 06/17] added result manager --- protocol/rpcconsumer/relay_processor.go | 192 +++++----------------- protocol/rpcconsumer/results_manager.go | 209 ++++++++++++++++++++++++ 2 files changed, 249 insertions(+), 152 deletions(-) create mode 100644 protocol/rpcconsumer/results_manager.go diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index e14c6a0a8c..2bf10f26ab 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -15,7 +15,6 @@ import ( "github.com/lavanet/lava/v2/protocol/common" "github.com/lavanet/lava/v2/protocol/lavasession" "github.com/lavanet/lava/v2/utils" - spectypes "github.com/lavanet/lava/v2/x/spec/types" ) const ( @@ -44,9 +43,6 @@ type RelayProcessor struct { usedProviders *lavasession.UsedProviders responses chan *relayResponse requiredSuccesses int - nodeResponseErrors RelayErrors - protocolResponseErrors RelayErrors - successResults []common.RelayResult lock sync.RWMutex protocolMessage chainlib.ProtocolMessage guid uint64 @@ -60,6 +56,7 @@ type RelayProcessor struct { chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter disableRelayRetry bool relayRetriesManager *RelayRetriesManager + ResultsManager } func NewRelayProcessor( @@ -86,8 +83,7 @@ func NewRelayProcessor( usedProviders: usedProviders, requiredSuccesses: requiredSuccesses, responses: make(chan *relayResponse, MaxCallsPerRelay), // we set it as buffered so it is not blocking - nodeResponseErrors: RelayErrors{relayErrors: []RelayError{}}, - protocolResponseErrors: RelayErrors{relayErrors: []RelayError{}, onFailureMergeAll: true}, + ResultsManager: NewResultsManager(guid), protocolMessage: protocolMessage, guid: guid, selection: selection, @@ -133,17 +129,13 @@ func (rp *RelayProcessor) String() string { if rp == nil { return "" } - rp.lock.RLock() - nodeErrors := len(rp.nodeResponseErrors.relayErrors) - protocolErrors := len(rp.protocolResponseErrors.relayErrors) - results := len(rp.successResults) - usedProviders := rp.usedProviders - rp.lock.RUnlock() + + usedProviders := rp.GetUsedProviders() currentlyUsedAddresses := usedProviders.CurrentlyUsedAddresses() unwantedAddresses := usedProviders.UnwantedAddresses() - return fmt.Sprintf("relayProcessor {results:%d, nodeErrors:%d, protocolErrors:%d,unwantedAddresses: %s,currentlyUsedAddresses:%s}", - results, nodeErrors, protocolErrors, strings.Join(unwantedAddresses, ";"), strings.Join(currentlyUsedAddresses, ";")) + return fmt.Sprintf("relayProcessor {%s, unwantedAddresses: %s,currentlyUsedAddresses:%s}", + rp.ResultsManager.String(), strings.Join(unwantedAddresses, ";"), strings.Join(currentlyUsedAddresses, ";")) } func (rp *RelayProcessor) GetUsedProviders() *lavasession.UsedProviders { @@ -162,34 +154,7 @@ func (rp *RelayProcessor) NodeResults() []common.RelayResult { return nil } rp.readExistingResponses() - rp.lock.RLock() - defer rp.lock.RUnlock() - return rp.nodeResultsInner() -} - -// only when locked -func (rp *RelayProcessor) nodeResultsInner() []common.RelayResult { - // start with results and add to them node results - nodeResults := rp.successResults - nodeResults = append(nodeResults, rp.nodeErrors()...) - return nodeResults -} - -// only when locked -func (rp *RelayProcessor) nodeErrors() (ret []common.RelayResult) { - for _, relayError := range rp.nodeResponseErrors.relayErrors { - ret = append(ret, relayError.response.relayResult) - } - return ret -} - -func (rp *RelayProcessor) ProtocolErrors() uint64 { - if rp == nil { - return 0 - } - rp.lock.RLock() - defer rp.lock.RUnlock() - return uint64(len(rp.protocolResponseErrors.relayErrors)) + return rp.ResultsManager.NodeResults() } func (rp *RelayProcessor) SetResponse(response *relayResponse) { @@ -202,82 +167,12 @@ func (rp *RelayProcessor) SetResponse(response *relayResponse) { rp.responses <- response } -func (rp *RelayProcessor) setValidResponse(response *relayResponse) { - rp.lock.Lock() - defer rp.lock.Unlock() - - // future relay requests and data reliability requests need to ask for the same specific block height to get consensus on the reply - // we do not modify the chain message data on the consumer, only it's requested block, so we let the provider know it can't put any block height it wants by setting a specific block height - reqBlock, _ := rp.protocolMessage.RequestedBlock() - if reqBlock == spectypes.LATEST_BLOCK { - // TODO: when we turn on dataReliability on latest call UpdateLatest, until then we turn it off always - // modifiedOnLatestReq := rp.chainMessage.UpdateLatestBlockInMessage(response.relayResult.Reply.LatestBlock, false) - // if !modifiedOnLatestReq { - response.relayResult.Finalized = false // shut down data reliability - // } - } - - if response.relayResult.Reply == nil { - utils.LavaFormatError("got to setValidResponse with nil Reply", - response.err, - utils.LogAttr("ProviderInfo", response.relayResult.ProviderInfo), - utils.LogAttr("StatusCode", response.relayResult.StatusCode), - utils.LogAttr("Finalized", response.relayResult.Finalized), - utils.LogAttr("Quorum", response.relayResult.Quorum), - ) - return - } - // no error, update the seen block - blockSeen := response.relayResult.Reply.LatestBlock - // nil safe - rp.consumerConsistency.SetSeenBlock(blockSeen, rp.userData.DappId, rp.userData.ConsumerIp) - // on subscribe results, we just append to successful results instead of parsing results because we already have a validation. - if chainlib.IsFunctionTagOfType(rp.protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) { - rp.successResults = append(rp.successResults, response.relayResult) - return - } - - // check response error - foundError, errorMessage := rp.protocolMessage.CheckResponseError(response.relayResult.Reply.Data, response.relayResult.StatusCode) - if foundError { - // this is a node error, meaning we still didn't get a good response. - // we may choose to wait until there will be a response or timeout happens - // if we decide to wait and timeout happens we will take the majority of response messages - err := fmt.Errorf("%s", errorMessage) - rp.nodeResponseErrors.relayErrors = append(rp.nodeResponseErrors.relayErrors, RelayError{err: err, ProviderInfo: response.relayResult.ProviderInfo, response: response}) - // send relay error metrics only on non stateful queries, as stateful queries always return X-1/X errors. - if rp.selection != BestResult { - go rp.metricsInf.SetRelayNodeErrorMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()) - utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", err), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.protocolMessage.GetApi().Name), utils.LogAttr("requested_block", reqBlock)) - } - return - } - rp.successResults = append(rp.successResults, response.relayResult) -} - -func (rp *RelayProcessor) setErrorResponse(response *relayResponse) { - rp.lock.Lock() - defer rp.lock.Unlock() - utils.LavaFormatDebug("could not send relay to provider", utils.Attribute{Key: "GUID", Value: rp.guid}, utils.Attribute{Key: "provider", Value: response.relayResult.ProviderInfo.ProviderAddress}, utils.Attribute{Key: "error", Value: response.err.Error()}) - rp.protocolResponseErrors.relayErrors = append(rp.protocolResponseErrors.relayErrors, RelayError{err: response.err, ProviderInfo: response.relayResult.ProviderInfo, response: response}) -} - func (rp *RelayProcessor) checkEndProcessing(responsesCount int) bool { rp.lock.RLock() defer rp.lock.RUnlock() - resultsCount := len(rp.successResults) - if resultsCount >= rp.requiredSuccesses { - // we have enough successes, we can return + if rp.ResultsManager.RequiredResults(rp.requiredSuccesses, rp.selection) { return true } - if rp.selection == Quorum { - // we need a quorum of all node results - nodeErrors := len(rp.nodeResponseErrors.relayErrors) - if nodeErrors+resultsCount >= rp.requiredSuccesses { - // we have enough node results for our quorum - return true - } - } // check if we got all of the responses if responsesCount >= rp.usedProviders.SessionsLatestBatch() { // no active sessions, and we read all the responses, we can return @@ -286,19 +181,6 @@ func (rp *RelayProcessor) checkEndProcessing(responsesCount int) bool { return false } -// this function defines if we should use the processor to return the result (meaning it has some insight and responses) or just return to the user -func (rp *RelayProcessor) HasResults() bool { - if rp == nil { - return false - } - rp.lock.RLock() - defer rp.lock.RUnlock() - resultsCount := len(rp.successResults) - nodeErrors := len(rp.nodeResponseErrors.relayErrors) - protocolErrors := len(rp.protocolResponseErrors.relayErrors) - return resultsCount+nodeErrors+protocolErrors > 0 -} - func (rp *RelayProcessor) getInputMsgInfoHashString() (string, error) { hash, err := rp.protocolMessage.GetRawRequestHash() hashString := "" @@ -350,7 +232,7 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool { } rp.lock.RLock() defer rp.lock.RUnlock() - resultsCount := len(rp.successResults) + resultsCount, nodeErrors, _ := rp.GetResults() hash, hashErr := rp.getInputMsgInfoHashString() if resultsCount >= rp.requiredSuccesses { @@ -361,7 +243,6 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool { // Check if we need to add node errors retry metrics if rp.selection == Quorum { // If nodeErrors length is larger than 0, our retry mechanism was activated. we add our metrics now. - nodeErrors := len(rp.nodeResponseErrors.relayErrors) if nodeErrors > 0 { chainId, apiInterface := rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface() go rp.metricsInf.SetNodeErrorRecoveredSuccessfullyMetric(chainId, apiInterface, strconv.Itoa(nodeErrors)) @@ -371,7 +252,6 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool { } if rp.selection == Quorum { // We need a quorum of all node results - nodeErrors := len(rp.nodeResponseErrors.relayErrors) if nodeErrors+resultsCount >= rp.requiredSuccesses { // Retry on node error flow: return rp.shouldRetryRelay(resultsCount, hashErr, nodeErrors, hash) @@ -382,13 +262,17 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool { } func (rp *RelayProcessor) handleResponse(response *relayResponse) { - if response == nil { - return + nodeError := rp.ResultsManager.SetResponse(response, rp.protocolMessage) + // send relay error metrics only on non stateful queries, as stateful queries always return X-1/X errors. + if nodeError != nil && rp.selection != BestResult { + go rp.metricsInf.SetRelayNodeErrorMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()) + utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", nodeError), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.protocolMessage.GetApi().Name)) } - if response.err != nil { - rp.setErrorResponse(response) - } else { - rp.setValidResponse(response) + + if response != nil && response.relayResult.GetReply().GetLatestBlock() > 0 { + // set consumer consistency when possible + blockSeen := response.relayResult.GetReply().GetLatestBlock() + rp.consumerConsistency.SetSeenBlock(blockSeen, rp.userData.DappId, rp.userData.ConsumerIp) } } @@ -501,54 +385,58 @@ func (rp *RelayProcessor) ProcessingResult() (returnedResult *common.RelayResult rp.lock.RLock() defer rp.lock.RUnlock() + + successResults, nodeErrors, protocolErrors := rp.GetResultsData() + successResultsCount, nodeErrorCount, protocolErrorCount := len(successResults), len(nodeErrors), len(protocolErrors) // there are enough successes - successResultsCount := len(rp.successResults) if successResultsCount >= rp.requiredSuccesses { - return rp.responsesQuorum(rp.successResults, rp.requiredSuccesses) + return rp.responsesQuorum(successResults, rp.requiredSuccesses) } - nodeResults := rp.nodeResultsInner() - // there are not enough successes, let's check if there are enough node errors if rp.debugRelay { // adding as much debug info as possible. all successful relays, all node errors and all protocol errors utils.LavaFormatDebug("[Processing Result] Debug Relay", utils.LogAttr("rp.requiredSuccesses", rp.requiredSuccesses)) - utils.LavaFormatDebug("[Processing Debug] number of node results", utils.LogAttr("len(rp.successResults)", len(rp.successResults)), utils.LogAttr("len(rp.nodeResponseErrors.relayErrors)", len(rp.nodeResponseErrors.relayErrors)), utils.LogAttr("len(rp.protocolResponseErrors.relayErrors)", len(rp.protocolResponseErrors.relayErrors))) - for idx, result := range rp.successResults { + utils.LavaFormatDebug("[Processing Debug] number of node results", utils.LogAttr("successResultsCount", successResultsCount), utils.LogAttr("nodeErrorCount", nodeErrorCount), utils.LogAttr("protocolErrorCount", protocolErrorCount)) + for idx, result := range successResults { utils.LavaFormatDebug("[Processing Debug] success result", utils.LogAttr("idx", idx), utils.LogAttr("result", result)) } - for idx, result := range rp.nodeResponseErrors.relayErrors { + for idx, result := range nodeErrors { utils.LavaFormatDebug("[Processing Debug] node result", utils.LogAttr("idx", idx), utils.LogAttr("result", result)) } - for idx, result := range rp.protocolResponseErrors.relayErrors { + for idx, result := range protocolErrors { utils.LavaFormatDebug("[Processing Debug] protocol error", utils.LogAttr("idx", idx), utils.LogAttr("result", result)) } } - if len(nodeResults) >= rp.requiredSuccesses { + // there are not enough successes, let's check if there are enough node errors + if successResultsCount+nodeErrorCount >= rp.requiredSuccesses { if rp.selection == Quorum { + nodeResults := make([]common.RelayResult, 0, len(successResults)+len(nodeErrors)) + nodeResults = append(nodeResults, successResults...) + nodeResults = append(nodeResults, nodeErrors...) return rp.responsesQuorum(nodeResults, rp.requiredSuccesses) - } else if rp.selection == BestResult && successResultsCount > len(rp.nodeResponseErrors.relayErrors) { + } else if rp.selection == BestResult && successResultsCount > nodeErrorCount { // we have more than half succeeded, and we are success oriented - return rp.responsesQuorum(rp.successResults, (rp.requiredSuccesses+1)/2) + return rp.responsesQuorum(successResults, (rp.requiredSuccesses+1)/2) } } // we don't have enough for a quorum, prefer a node error on protocol errors - if len(rp.nodeResponseErrors.relayErrors) >= rp.requiredSuccesses { // if we have node errors, we prefer returning them over protocol errors. - nodeErr := rp.nodeResponseErrors.GetBestErrorMessageForUser() + if nodeErrorCount >= rp.requiredSuccesses { // if we have node errors, we prefer returning them over protocol errors. + nodeErr := rp.GetBestNodeErrorMessageForUser() return &nodeErr.response.relayResult, nil } // if we got here we trigger a protocol error returnedResult = &common.RelayResult{StatusCode: http.StatusInternalServerError} - if len(rp.nodeResponseErrors.relayErrors) > 0 { // if we have node errors, we prefer returning them over protocol errors, even if it's just the one - nodeErr := rp.nodeResponseErrors.GetBestErrorMessageForUser() + if nodeErrorCount > 0 { // if we have node errors, we prefer returning them over protocol errors, even if it's just the one + nodeErr := rp.GetBestNodeErrorMessageForUser() processingError = nodeErr.err errorResponse := nodeErr.response if errorResponse != nil { returnedResult = &errorResponse.relayResult } - } else if len(rp.protocolResponseErrors.relayErrors) > 0 { - protocolErr := rp.protocolResponseErrors.GetBestErrorMessageForUser() + } else if protocolErrorCount > 0 { + protocolErr := rp.GetBestProtocolErrorMessageForUser() processingError = protocolErr.err errorResponse := protocolErr.response if errorResponse != nil { diff --git a/protocol/rpcconsumer/results_manager.go b/protocol/rpcconsumer/results_manager.go new file mode 100644 index 0000000000..be8ae40c41 --- /dev/null +++ b/protocol/rpcconsumer/results_manager.go @@ -0,0 +1,209 @@ +package rpcconsumer + +import ( + "fmt" + "sync" + + "github.com/lavanet/lava/v2/protocol/chainlib" + common "github.com/lavanet/lava/v2/protocol/common" + "github.com/lavanet/lava/v2/utils" + spectypes "github.com/lavanet/lava/v2/x/spec/types" +) + +type ResultsManager interface { + String() string + NodeResults() []common.RelayResult + RequiredResults(requiredSuccesses int, selection Selection) bool + ProtocolErrors() uint64 + HasResults() bool + GetResults() (success int, nodeErrors int, protocolErrors int) + GetResultsData() (successResults []common.RelayResult, nodeErrors []common.RelayResult, protocolErrors []RelayError) + SetResponse(response *relayResponse, protocolMessage chainlib.ProtocolMessage) (nodeError error) + GetBestNodeErrorMessageForUser() RelayError + GetBestProtocolErrorMessageForUser() RelayError +} + +type ResultsManagerInst struct { + nodeResponseErrors RelayErrors + protocolResponseErrors RelayErrors + successResults []common.RelayResult + lock sync.RWMutex + guid uint64 +} + +func NewResultsManager(guid uint64) ResultsManager { + return &ResultsManagerInst{ + guid: guid, + nodeResponseErrors: RelayErrors{relayErrors: []RelayError{}}, + protocolResponseErrors: RelayErrors{relayErrors: []RelayError{}, onFailureMergeAll: true}, + } +} + +func (rp *ResultsManagerInst) setErrorResponse(response *relayResponse) { + rp.lock.Lock() + defer rp.lock.Unlock() + utils.LavaFormatDebug("could not send relay to provider", utils.Attribute{Key: "GUID", Value: rp.guid}, utils.Attribute{Key: "provider", Value: response.relayResult.ProviderInfo.ProviderAddress}, utils.Attribute{Key: "error", Value: response.err.Error()}) + rp.protocolResponseErrors.relayErrors = append(rp.protocolResponseErrors.relayErrors, RelayError{err: response.err, ProviderInfo: response.relayResult.ProviderInfo, response: response}) +} + +// only when locked +func (rp *ResultsManagerInst) nodeResultsInner() []common.RelayResult { + // start with results and add to them node results + nodeResults := rp.successResults + nodeResults = append(nodeResults, rp.nodeErrors()...) + return nodeResults +} + +// only when locked +func (rp *ResultsManagerInst) nodeErrors() (ret []common.RelayResult) { + for _, relayError := range rp.nodeResponseErrors.relayErrors { + ret = append(ret, relayError.response.relayResult) + } + return ret +} + +// returns an error if and only if it was a parsed node error +func (rp *ResultsManagerInst) setValidResponse(response *relayResponse, protocolMessage chainlib.ProtocolMessage) (nodeError error) { + rp.lock.Lock() + defer rp.lock.Unlock() + + // future relay requests and data reliability requests need to ask for the same specific block height to get consensus on the reply + // we do not modify the chain message data on the consumer, only it's requested block, so we let the provider know it can't put any block height it wants by setting a specific block height + reqBlock, _ := protocolMessage.RequestedBlock() + if reqBlock == spectypes.LATEST_BLOCK { + // TODO: when we turn on dataReliability on latest call UpdateLatest, until then we turn it off always + // modifiedOnLatestReq := rp.chainMessage.UpdateLatestBlockInMessage(response.relayResult.Reply.LatestBlock, false) + // if !modifiedOnLatestReq { + response.relayResult.Finalized = false // shut down data reliability + // } + } + + if response.relayResult.Reply == nil { + utils.LavaFormatError("got to setValidResponse with nil Reply", + response.err, + utils.LogAttr("ProviderInfo", response.relayResult.ProviderInfo), + utils.LogAttr("StatusCode", response.relayResult.StatusCode), + utils.LogAttr("Finalized", response.relayResult.Finalized), + utils.LogAttr("Quorum", response.relayResult.Quorum), + ) + return nil + } + + // on subscribe results, we just append to successful results instead of parsing results because we already have a validation. + if chainlib.IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) { + rp.successResults = append(rp.successResults, response.relayResult) + return nil + } + + // check response error + foundError, errorMessage := protocolMessage.CheckResponseError(response.relayResult.Reply.Data, response.relayResult.StatusCode) + if foundError { + // this is a node error, meaning we still didn't get a good response. + // we may choose to wait until there will be a response or timeout happens + // if we decide to wait and timeout happens we will take the majority of response messages + err := fmt.Errorf("%s", errorMessage) + rp.nodeResponseErrors.relayErrors = append(rp.nodeResponseErrors.relayErrors, RelayError{err: err, ProviderInfo: response.relayResult.ProviderInfo, response: response}) + return err + } + rp.successResults = append(rp.successResults, response.relayResult) + return nil +} + +func (rm *ResultsManagerInst) GetResults() (success int, nodeErrors int, protocolErrors int) { + rm.lock.RLock() + defer rm.lock.RUnlock() + + nodeErrors = len(rm.nodeResponseErrors.relayErrors) + protocolErrors = len(rm.protocolResponseErrors.relayErrors) + success = len(rm.successResults) + return success, nodeErrors, protocolErrors +} + +func (rm *ResultsManagerInst) String() string { + results, nodeErrors, protocolErrors := rm.GetResults() + return fmt.Sprintf("resultsManager {success %d, nodeErrors:%d, protocolErrors:%d}", results, nodeErrors, protocolErrors) +} + +// this function returns all results that came from a node, meaning success, and node errors +func (rp *ResultsManagerInst) NodeResults() []common.RelayResult { + if rp == nil { + return nil + } + rp.lock.RLock() + defer rp.lock.RUnlock() + return rp.nodeResultsInner() +} + +func (rp *ResultsManagerInst) ProtocolErrors() uint64 { + if rp == nil { + return 0 + } + rp.lock.RLock() + defer rp.lock.RUnlock() + return uint64(len(rp.protocolResponseErrors.relayErrors)) +} + +func (rp *ResultsManagerInst) RequiredResults(requiredSuccesses int, selection Selection) bool { + if rp == nil { + return false + } + rp.lock.RLock() + defer rp.lock.RUnlock() + resultsCount := len(rp.successResults) + if resultsCount >= requiredSuccesses { + // we have enough successes, we can return + return true + } + if selection == Quorum { + // we need a quorum of all node results + nodeErrors := len(rp.nodeResponseErrors.relayErrors) + if nodeErrors+resultsCount >= requiredSuccesses { + // we have enough node results for our quorum + return true + } + } + return false +} + +// this function defines if we should use the manager to return the result (meaning it has some insight and responses) or just return to the user +func (rp *ResultsManagerInst) HasResults() bool { + if rp == nil { + return false + } + rp.lock.RLock() + defer rp.lock.RUnlock() + resultsCount := len(rp.successResults) + nodeErrors := len(rp.nodeResponseErrors.relayErrors) + protocolErrors := len(rp.protocolResponseErrors.relayErrors) + return resultsCount+nodeErrors+protocolErrors > 0 +} + +func (rp *ResultsManagerInst) SetResponse(response *relayResponse, protocolMessage chainlib.ProtocolMessage) (nodeError error) { + if response == nil { + return nil + } + if response.err != nil { + rp.setErrorResponse(response) + } else { + return rp.setValidResponse(response, protocolMessage) + } + return nil +} + +func (rp *ResultsManagerInst) GetResultsData() (successResults []common.RelayResult, nodeErrors []common.RelayResult, protocolErrors []RelayError) { + rp.lock.RLock() + defer rp.lock.RUnlock() + return rp.successResults, rp.nodeErrors(), rp.protocolResponseErrors.relayErrors +} + +func (rp *ResultsManagerInst) GetBestNodeErrorMessageForUser() RelayError { + rp.lock.RLock() + defer rp.lock.RUnlock() + return rp.nodeResponseErrors.GetBestErrorMessageForUser() +} + +func (rp *ResultsManagerInst) GetBestProtocolErrorMessageForUser() RelayError { + rp.lock.RLock() + defer rp.lock.RUnlock() + return rp.nodeResponseErrors.GetBestErrorMessageForUser() +} From 1e2beafeae4b53564c473565cf313486d7f0a4e1 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Mon, 26 Aug 2024 18:47:08 +0200 Subject: [PATCH 07/17] removed user data --- protocol/rpcconsumer/rpcconsumer_server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 08a3332100..ebb26997ff 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -1239,7 +1239,6 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context } relayResult := results[0] if len(results) < 2 { - userData := chainMessage.GetUserData() relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions) // We create new protocol message from the old one, but with a new instance of relay request data. protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relayRequestData) From 4dd4715b0178613ee6cb947da86e1041ca4a6d51 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Mon, 26 Aug 2024 19:10:51 +0200 Subject: [PATCH 08/17] fix lint --- protocol/rpcconsumer/rpcconsumer_server.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index ebb26997ff..a778ca8ef2 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -1241,7 +1241,12 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context if len(results) < 2 { relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions) // We create new protocol message from the old one, but with a new instance of relay request data. - protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relayRequestData) + chainMsg, ok := chainMessage.(chainlib.ChainMessage) + if !ok { + return utils.LavaFormatWarning("failed data reliability relay to provider, failed converting chain message", nil, utils.LogAttr("chainMessage", chainMessage)) + } + userData := chainMessage.GetUserData() + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, relayRequestData, userData.DappId, userData.ConsumerIp) relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.relayRetriesManager) err := rpccs.sendRelayToProvider(ctx, protocolMessage, relayProcessorDataReliability, nil) if err != nil { From 74d8e1f4b891f3c94c8c4a4d931e71ab8cc7a1c2 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Mon, 26 Aug 2024 19:23:08 +0200 Subject: [PATCH 09/17] fix test --- protocol/rpcconsumer/relay_processor_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index 7f8d8531da..cf23e7bf46 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -294,6 +294,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { t.Run("retry_flow_disabled", func(t *testing.T) { ctx := context.Background() + disableRelayRetry = true serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Handle the incoming request and provide the desired response w.WriteHeader(http.StatusOK) @@ -327,6 +328,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { require.True(t, resultsOk) requiredNodeResults := relayProcessor.HasRequiredNodeResults() require.True(t, requiredNodeResults) + disableRelayRetry = false }) } From 6be4e5d1e745ba8a9650097b21d8837db1d70f10 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Tue, 27 Aug 2024 12:57:52 +0200 Subject: [PATCH 10/17] merge fix --- protocol/rpcconsumer/results_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/protocol/rpcconsumer/results_manager.go b/protocol/rpcconsumer/results_manager.go index be8ae40c41..f634a3f066 100644 --- a/protocol/rpcconsumer/results_manager.go +++ b/protocol/rpcconsumer/results_manager.go @@ -21,6 +21,7 @@ type ResultsManager interface { SetResponse(response *relayResponse, protocolMessage chainlib.ProtocolMessage) (nodeError error) GetBestNodeErrorMessageForUser() RelayError GetBestProtocolErrorMessageForUser() RelayError + nodeErrors() (ret []common.RelayResult) } type ResultsManagerInst struct { From 330e9b5711d8a1175eba98e2df346c7df148fe6c Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Tue, 27 Aug 2024 13:14:39 +0200 Subject: [PATCH 11/17] risky brola --- protocol/rpcconsumer/results_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcconsumer/results_manager.go b/protocol/rpcconsumer/results_manager.go index f634a3f066..69f421f070 100644 --- a/protocol/rpcconsumer/results_manager.go +++ b/protocol/rpcconsumer/results_manager.go @@ -206,5 +206,5 @@ func (rp *ResultsManagerInst) GetBestNodeErrorMessageForUser() RelayError { func (rp *ResultsManagerInst) GetBestProtocolErrorMessageForUser() RelayError { rp.lock.RLock() defer rp.lock.RUnlock() - return rp.nodeResponseErrors.GetBestErrorMessageForUser() + return rp.protocolResponseErrors.GetBestErrorMessageForUser() } From d73255ffac3a77ecb02a1db1f33b2c9d5c0cfa7d Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Wed, 28 Aug 2024 14:46:51 +0200 Subject: [PATCH 12/17] fix conflict test. --- protocol/integration/protocol_test.go | 21 +++++++++++-------- .../lavasession/consumer_session_manager.go | 5 ++++- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 8fece3cd9f..98a48af408 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -1090,12 +1090,12 @@ func TestSameProviderConflictReport(t *testing.T) { twoProvidersConflictSent := false sameProviderConflictSent := false - wg := sync.WaitGroup{} - wg.Add(1) + numberOfRelays := 10 + reported := make(chan bool, numberOfRelays) txConflictDetectionMock := func(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, conflictHandler common.ConflictHandlerInterface) error { if finalizationConflict == nil { require.FailNow(t, "Finalization conflict should not be nil") - wg.Done() + reported <- true return nil } utils.LavaFormatDebug("@@@@@@@@@@@@@@@ Called conflict mock tx", utils.LogAttr("provider0", finalizationConflict.RelayFinalization_0.RelaySession.Provider), utils.LogAttr("provider0", finalizationConflict.RelayFinalization_1.RelaySession.Provider)) @@ -1114,7 +1114,7 @@ func TestSameProviderConflictReport(t *testing.T) { } twoProvidersConflictSent = true - wg.Done() + reported <- true return nil } mockConsumerStateTracker.SetTxConflictDetectionWrapper(txConflictDetectionMock) @@ -1137,16 +1137,19 @@ func TestSameProviderConflictReport(t *testing.T) { req, err := http.NewRequest(http.MethodPost, "http://"+consumerListenAddress+"/cosmos/tx/v1beta1/txs", nil) require.NoError(t, err) - for i := 0; i < 2; i++ { + for i := 0; i < numberOfRelays; i++ { // Two relays to trigger both same provider conflict and - resp, err := client.Do(req) - require.NoError(t, err) - require.Equal(t, http.StatusOK, resp.StatusCode) + go func() { + resp, err := client.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + }() } // conflict calls happen concurrently, therefore we need to wait the call. - wg.Wait() + <-reported require.True(t, sameProviderConflictSent) require.True(t, twoProvidersConflictSent) + }) } diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index 9b806a4d69..397c1301f8 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -832,6 +832,8 @@ func (csm *ConsumerSessionManager) removeAddressFromValidAddresses(address strin // Blocks a provider making him unavailable for pick this epoch, will also report him as unavailable if reportProvider is set to true. // Validates that the sessionEpoch is equal to cs.currentEpoch otherwise doesn't take effect. func (csm *ConsumerSessionManager) blockProvider(address string, reportProvider bool, sessionEpoch uint64, disconnections uint64, errors uint64, allowSecondChance bool, reconnectCallback func() error, errorsForReport []error) error { + utils.LavaFormatDebug("CSM Blocking provider", utils.LogAttr("address", address), utils.LogAttr("errorsForReport", errorsForReport), utils.LogAttr("allowing_second_chance", allowSecondChance)) + // find Index of the address if sessionEpoch != csm.atomicReadCurrentEpoch() { // we read here atomically so cs.currentEpoch cant change in the middle, so we can save time if epochs mismatch return EpochMismatchError @@ -847,10 +849,10 @@ func (csm *ConsumerSessionManager) blockProvider(address string, reportProvider go func() { <-time.After(retrySecondChanceAfter) // check epoch is still relevant, if not just return - utils.LavaFormatDebug("Running second chance for provider", utils.LogAttr("address", address)) if sessionEpoch != csm.atomicReadCurrentEpoch() { return } + utils.LavaFormatDebug("Running second chance for provider", utils.LogAttr("address", address)) csm.validateAndReturnBlockedProviderToValidAddressesList(address) }() } @@ -967,6 +969,7 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu if err != nil { return err } + if !redemptionSession && blockProvider { publicProviderAddress, pairingEpoch := parentConsumerSessionsWithProvider.getPublicLavaAddressAndPairingEpoch() err = csm.blockProvider(publicProviderAddress, reportProvider, pairingEpoch, 0, consecutiveErrors, allowSecondChance, nil, errorsForConsumerSession) From fa99d61a634da4cb87fb7df5ddb6abfa4df28b87 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Wed, 28 Aug 2024 15:18:45 +0200 Subject: [PATCH 13/17] fixed all comments --- protocol/common/cobra_common.go | 2 -- protocol/rpcconsumer/relay_processor.go | 18 +++++++------ protocol/rpcconsumer/rpcconsumer.go | 8 +----- protocol/rpcconsumer/rpcconsumer_server.go | 30 +++++++++++----------- 4 files changed, 26 insertions(+), 32 deletions(-) diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index c4fbcc5ea0..719b3bbb75 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -31,9 +31,7 @@ const ( DisableConflictTransactionsFlag = "disable-conflict-transactions" // disable conflict transactions, this will hard the network's data reliability and therefore will harm the service. // Disable relay retries when we get node errors. // This feature is suppose to help with successful relays in some chains that return node errors on rare race conditions on the serviced chains. - DisableRetryOnNodeErrorsFlag = "disable-retry-on-node-error" SetRelayCountOnNodeErrorFlag = "set-retry-count-on-node-error" - DisableCacheOnNodeErrorFlag = "disable-cache-on-node-error" UseOfflineSpecFlag = "use-offline-spec" // allows the user to manually load a spec providing a path, this is useful to test spec changes before they hit the blockchain ) diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index 1188226332..322f8e112d 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -21,14 +21,17 @@ import ( type Selection int const ( - Quorum Selection = iota // get the majority out of requiredSuccesses - BestResult // get the best result, even if it means waiting MaxCallsPerRelay = 50 ) var ( relayCountOnNodeError = 2 - disableRelayRetry = false +) + +// selection Enum, do not add other const +const ( + Quorum Selection = iota // get the majority out of requiredSuccesses + BestResult // get the best result, even if it means waiting ) type MetricsInterface interface { @@ -309,11 +312,10 @@ func (rp *RelayProcessor) getInputMsgInfoHashString() (string, error) { // Deciding wether we should send a relay retry attempt based on the node error func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, nodeErrors int, hash string) bool { // Retries will be performed based on the following scenarios: - // 1. rp.disableRelayRetry == false, In case we want to try again if we have a node error. - // 2. If we have 0 successful relays and we have only node errors. - // 3. Hash calculation was successful. - // 4. Number of retries < NumberOfRetriesAllowedOnNodeErrors. - if !disableRelayRetry && resultsCount == 0 && hashErr == nil { + // 1. If we have 0 successful relays and we have only node errors. + // 2. Hash calculation was successful. + // 3. Number of retries < relayCountOnNodeError. + if resultsCount == 0 && hashErr == nil { if nodeErrors <= relayCountOnNodeError { // TODO: check chain message retry on archive. (this feature will be added in the generic parsers feature) diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index c011f4f1eb..36efe2f96a 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -586,10 +586,6 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 OfflineSpecPath: viper.GetString(common.UseOfflineSpecFlag), } - // set relay processor's global params - relayCountOnNodeError = viper.GetInt(common.SetRelayCountOnNodeErrorFlag) - disableRelayRetry = viper.GetBool(common.DisableRetryOnNodeErrorsFlag) - // validate user is does not provide multi chain setup when using the offline spec feature. if consumerPropagatedFlags.OfflineSpecPath != "" && len(rpcEndpoints) > 1 { utils.LavaFormatFatal("offline spec modifications are supported only in single chain bootstrapping", nil, utils.LogAttr("len(rpcEndpoints)", len(rpcEndpoints)), utils.LogAttr("rpcEndpoints", rpcEndpoints)) @@ -635,9 +631,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().Bool(common.DisableConflictTransactionsFlag, false, "disabling conflict transactions, this flag should not be used as it harms the network's data reliability and therefore the service.") cmdRPCConsumer.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks") cmdRPCConsumer.Flags().String(common.UseOfflineSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain") - cmdRPCConsumer.Flags().Bool(common.DisableRetryOnNodeErrorsFlag, false, "Disable relay retries on node errors, prevent the rpcconsumer trying a different provider") - cmdRPCConsumer.Flags().Int(common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") - + cmdRPCConsumer.Flags().IntVar(&relayCountOnNodeError, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") common.AddRollingLogConfig(cmdRPCConsumer) return cmdRPCConsumer } diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index a778ca8ef2..5a868805d4 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -1199,8 +1199,8 @@ func (rpccs *RPCConsumerServer) getFirstSubscriptionReply(ctx context.Context, h return &reply, nil } -func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context.Context, chainMessage chainlib.ProtocolMessage, dataReliabilityThreshold uint32, relayProcessor *RelayProcessor) error { - processingTimeout, expectedRelayTimeout := rpccs.getProcessingTimeout(chainMessage) +func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context.Context, protocolMessage chainlib.ProtocolMessage, dataReliabilityThreshold uint32, relayProcessor *RelayProcessor) error { + processingTimeout, expectedRelayTimeout := rpccs.getProcessingTimeout(protocolMessage) // Wait another relayTimeout duration to maybe get additional relay results if relayProcessor.usedProviders.CurrentlyUsed() > 0 { time.Sleep(expectedRelayTimeout) @@ -1208,7 +1208,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - specCategory := chainMessage.GetApi().Category + specCategory := protocolMessage.GetApi().Category if !specCategory.Deterministic { return nil // disabled for this spec and requested block so no data reliability messages } @@ -1229,7 +1229,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context return nil } - reqBlock, _ := chainMessage.RequestedBlock() + reqBlock, _ := protocolMessage.RequestedBlock() if reqBlock <= spectypes.NOT_APPLICABLE { if reqBlock <= spectypes.LATEST_BLOCK { return utils.LavaFormatError("sendDataReliabilityRelayIfApplicable latest requestBlock", nil, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "RequestBlock", Value: reqBlock}) @@ -1239,16 +1239,16 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context } relayResult := results[0] if len(results) < 2 { - relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions) + relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, protocolMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions) // We create new protocol message from the old one, but with a new instance of relay request data. - chainMsg, ok := chainMessage.(chainlib.ChainMessage) + chainMsg, ok := protocolMessage.(chainlib.ChainMessage) if !ok { - return utils.LavaFormatWarning("failed data reliability relay to provider, failed converting chain message", nil, utils.LogAttr("chainMessage", chainMessage)) + return utils.LavaFormatWarning("failed data reliability relay to provider, failed converting chain message", nil, utils.LogAttr("chainMessage", protocolMessage)) } - userData := chainMessage.GetUserData() - protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, relayRequestData, userData.DappId, userData.ConsumerIp) - relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.relayRetriesManager) - err := rpccs.sendRelayToProvider(ctx, protocolMessage, relayProcessorDataReliability, nil) + userData := protocolMessage.GetUserData() + dataReliabilityProtocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, relayRequestData, userData.DappId, userData.ConsumerIp) + relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, protocolMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.relayRetriesManager) + err := rpccs.sendRelayToProvider(ctx, dataReliabilityProtocolMessage, relayProcessorDataReliability, nil) if err != nil { return utils.LavaFormatWarning("failed data reliability relay to provider", err, utils.LogAttr("relayProcessorDataReliability", relayProcessorDataReliability)) } @@ -1275,21 +1275,21 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context for i := 0; i < len(results)-1; i++ { relayResult := results[i] relayResultDataReliability := results[i+1] - conflict := lavaprotocol.VerifyReliabilityResults(ctx, &relayResult, &relayResultDataReliability, chainMessage.GetApiCollection(), rpccs.chainParser) + conflict := lavaprotocol.VerifyReliabilityResults(ctx, &relayResult, &relayResultDataReliability, protocolMessage.GetApiCollection(), rpccs.chainParser) if conflict != nil { // TODO: remove this check when we fix the missing extensions information on conflict detection transaction - if len(chainMessage.GetExtensions()) == 0 { + if len(protocolMessage.GetExtensions()) == 0 { err := rpccs.consumerTxSender.TxConflictDetection(ctx, nil, conflict, relayResultDataReliability.ConflictHandler) if err != nil { utils.LavaFormatError("could not send detection Transaction", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "conflict", Value: conflict}) } if rpccs.reporter != nil { - utils.LavaFormatDebug("sending conflict report to BE", utils.LogAttr("conflicting api", chainMessage.GetApi().Name)) + utils.LavaFormatDebug("sending conflict report to BE", utils.LogAttr("conflicting api", protocolMessage.GetApi().Name)) rpccs.reporter.AppendConflict(metrics.NewConflictRequest(relayResult.Request, relayResult.Reply, relayResultDataReliability.Request, relayResultDataReliability.Reply)) } } } else { - utils.LavaFormatDebug("[+] verified relay successfully with data reliability", utils.LogAttr("api", chainMessage.GetApi().Name)) + utils.LavaFormatDebug("[+] verified relay successfully with data reliability", utils.LogAttr("api", protocolMessage.GetApi().Name)) } } return nil From 788fe73051099709de87872cf7e6d63e284dbcf4 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Wed, 28 Aug 2024 15:25:15 +0200 Subject: [PATCH 14/17] llinty --- protocol/rpcconsumer/relay_processor.go | 4 +--- protocol/rpcconsumer/relay_processor_test.go | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index 322f8e112d..152ea76aeb 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -24,9 +24,7 @@ const ( MaxCallsPerRelay = 50 ) -var ( - relayCountOnNodeError = 2 -) +var relayCountOnNodeError = 2 // selection Enum, do not add other const const ( diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index cf23e7bf46..a323614428 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -294,7 +294,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { t.Run("retry_flow_disabled", func(t *testing.T) { ctx := context.Background() - disableRelayRetry = true + relayCountOnNodeError = 0 serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Handle the incoming request and provide the desired response w.WriteHeader(http.StatusOK) @@ -328,7 +328,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { require.True(t, resultsOk) requiredNodeResults := relayProcessor.HasRequiredNodeResults() require.True(t, requiredNodeResults) - disableRelayRetry = false + relayCountOnNodeError = 2 }) } From 85840df1fbbd947108d33c85f239f7732a9d897e Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Wed, 28 Aug 2024 15:31:23 +0200 Subject: [PATCH 15/17] fix lint --- protocol/integration/protocol_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 98a48af408..e53463335a 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -1149,7 +1149,6 @@ func TestSameProviderConflictReport(t *testing.T) { <-reported require.True(t, sameProviderConflictSent) require.True(t, twoProvidersConflictSent) - }) } From 06c3916f7d1c2dcd2dbd2af3d89ed73cf8c1429b Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Wed, 28 Aug 2024 16:10:42 +0200 Subject: [PATCH 16/17] bugberan --- protocol/rpcconsumer/relay_processor.go | 2 +- protocol/rpcconsumer/rpcconsumer_server.go | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index 152ea76aeb..3ab22627b3 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -313,7 +313,7 @@ func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, node // 1. If we have 0 successful relays and we have only node errors. // 2. Hash calculation was successful. // 3. Number of retries < relayCountOnNodeError. - if resultsCount == 0 && hashErr == nil { + if relayCountOnNodeError > 0 && resultsCount == 0 && hashErr == nil { if nodeErrors <= relayCountOnNodeError { // TODO: check chain message retry on archive. (this feature will be added in the generic parsers feature) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 5a868805d4..48a6d82e2d 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -1240,13 +1240,9 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context relayResult := results[0] if len(results) < 2 { relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, protocolMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions) - // We create new protocol message from the old one, but with a new instance of relay request data. - chainMsg, ok := protocolMessage.(chainlib.ChainMessage) - if !ok { - return utils.LavaFormatWarning("failed data reliability relay to provider, failed converting chain message", nil, utils.LogAttr("chainMessage", protocolMessage)) - } userData := protocolMessage.GetUserData() - dataReliabilityProtocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, relayRequestData, userData.DappId, userData.ConsumerIp) + // We create new protocol message from the old one, but with a new instance of relay request data. + dataReliabilityProtocolMessage := chainlib.NewProtocolMessage(protocolMessage, nil, relayRequestData, userData.DappId, userData.ConsumerIp) relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, protocolMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.relayRetriesManager) err := rpccs.sendRelayToProvider(ctx, dataReliabilityProtocolMessage, relayProcessorDataReliability, nil) if err != nil { From 8605702036161d3f14a9bc12f5abea027579e24a Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Wed, 28 Aug 2024 16:11:33 +0200 Subject: [PATCH 17/17] fix comment --- protocol/rpcconsumer/relay_processor.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index 3ab22627b3..7d66aae584 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -310,9 +310,10 @@ func (rp *RelayProcessor) getInputMsgInfoHashString() (string, error) { // Deciding wether we should send a relay retry attempt based on the node error func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, nodeErrors int, hash string) bool { // Retries will be performed based on the following scenarios: - // 1. If we have 0 successful relays and we have only node errors. - // 2. Hash calculation was successful. - // 3. Number of retries < relayCountOnNodeError. + // 1. If relayCountOnNodeError > 0 + // 2. If we have 0 successful relays and we have only node errors. + // 3. Hash calculation was successful. + // 4. Number of retries < relayCountOnNodeError. if relayCountOnNodeError > 0 && resultsCount == 0 && hashErr == nil { if nodeErrors <= relayCountOnNodeError { // TODO: check chain message retry on archive. (this feature will be added in the generic parsers feature)