From bb1173fe4a33f3a3db572df9cfdc8011e2a6ebec Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Tue, 3 Sep 2024 17:31:39 +0200 Subject: [PATCH] fixing batch update issue and adding unitests --- .../consumer_relay_state_machine.go | 3 + .../consumer_relay_state_machine_test.go | 98 ++++++++++++++++++- 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/protocol/rpcconsumer/consumer_relay_state_machine.go b/protocol/rpcconsumer/consumer_relay_state_machine.go index 967b5e0a27..06dbd68252 100644 --- a/protocol/rpcconsumer/consumer_relay_state_machine.go +++ b/protocol/rpcconsumer/consumer_relay_state_machine.go @@ -62,6 +62,7 @@ func NewRelayStateMachine( selection: selection, debugRelays: debugRelays, tickerMetricSetter: tickerMetricSetter, + batchUpdate: make(chan error), } } @@ -229,5 +230,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend } func (crsm *ConsumerRelayStateMachine) UpdateBatch(err error) { + utils.LavaFormatDebug("updating batch before") crsm.batchUpdate <- err + utils.LavaFormatDebug("updating batch after") } diff --git a/protocol/rpcconsumer/consumer_relay_state_machine_test.go b/protocol/rpcconsumer/consumer_relay_state_machine_test.go index f4e4d0539f..0b387e86e0 100644 --- a/protocol/rpcconsumer/consumer_relay_state_machine_test.go +++ b/protocol/rpcconsumer/consumer_relay_state_machine_test.go @@ -1,7 +1,103 @@ package rpcconsumer -import "testing" +import ( + context "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/lavanet/lava/v3/protocol/chainlib" + "github.com/lavanet/lava/v3/protocol/chainlib/extensionslib" + lavasession "github.com/lavanet/lava/v3/protocol/lavasession" + "github.com/lavanet/lava/v3/protocol/metrics" + spectypes "github.com/lavanet/lava/v3/x/spec/types" + "github.com/stretchr/testify/require" +) + +type ConsumerRelaySenderMock struct { + retValue error +} + +func (crsm *ConsumerRelaySenderMock) sendRelayToProvider(ctx context.Context, protocolMessage chainlib.ProtocolMessage, relayProcessor *RelayProcessor, analytics *metrics.RelayMetrics) (errRet error) { + return crsm.retValue +} + +func (crsm *ConsumerRelaySenderMock) getProcessingTimeout(chainMessage chainlib.ChainMessage) (processingTimeout time.Duration, relayTimeout time.Duration) { + return time.Second * 50000, 100 * time.Millisecond +} + +func (crsm *ConsumerRelaySenderMock) GetChainIdAndApiInterface() (string, string) { + return "testUno", "testDos" +} func TestConsumerStateMachineHappyFlow(t *testing.T) { + t.Run("happy", func(t *testing.T) { + ctx := context.Background() + serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle the incoming request and provide the desired response + w.WriteHeader(http.StatusOK) + }) + specId := "LAV1" + chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, specId, spectypes.APIInterfaceRest, serverHandler, nil, "../../", nil) + if closeServer != nil { + defer closeServer() + } + require.NoError(t, err) + chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + dappId := "dapp" + consumerIp := "123.11" + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, dappId, consumerIp) + consistency := NewConsumerConsistency(specId) + + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, consistency, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, &ConsumerRelaySenderMock{retValue: nil}, protocolMessage, nil, false, relayProcessorMetrics)) + + usedProviders := relayProcessor.GetUsedProviders() + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + canUse := usedProviders.TryLockSelection(ctx) + require.NoError(t, ctx.Err()) + require.Nil(t, canUse) + require.Zero(t, usedProviders.CurrentlyUsed()) + require.Zero(t, usedProviders.SessionsLatestBatch()) + consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} + + relayTaskChannel := relayProcessor.GetRelayTaskChannel() + taskNumber := 0 + for task := range relayTaskChannel { + switch taskNumber { + case 0: + require.False(t, task.IsDone()) + usedProviders.AddUsed(consumerSessionsMap, nil) + relayProcessor.UpdateBatch(nil) + sendProtocolError(relayProcessor, "lava@test", time.Millisecond*1, fmt.Errorf("bad")) + case 1: + require.False(t, task.IsDone()) + usedProviders.AddUsed(consumerSessionsMap, nil) + relayProcessor.UpdateBatch(nil) + sendNodeError(relayProcessor, "lava2@test", time.Millisecond*1) + case 2: + require.False(t, task.IsDone()) + usedProviders.AddUsed(consumerSessionsMap, nil) + relayProcessor.UpdateBatch(nil) + sendNodeError(relayProcessor, "lava2@test", time.Millisecond*1) + case 3: + require.False(t, task.IsDone()) + usedProviders.AddUsed(consumerSessionsMap, nil) + relayProcessor.UpdateBatch(nil) + sendSuccessResp(relayProcessor, "lava4@test", time.Millisecond*1) + case 4: + require.True(t, task.IsDone()) + require.True(t, relayProcessor.HasRequiredNodeResults()) + returnedResult, err := relayProcessor.ProcessingResult() + require.NoError(t, err) + require.Equal(t, string(returnedResult.Reply.Data), "ok") + require.Equal(t, http.StatusOK, returnedResult.StatusCode) + return // end test. + } + taskNumber++ + } + }) }