Skip to content

Commit

Permalink
fixing batch update issue and adding unitests
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Sep 3, 2024
1 parent b95a617 commit bb1173f
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 1 deletion.
3 changes: 3 additions & 0 deletions protocol/rpcconsumer/consumer_relay_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func NewRelayStateMachine(
selection: selection,
debugRelays: debugRelays,
tickerMetricSetter: tickerMetricSetter,
batchUpdate: make(chan error),
}
}

Expand Down Expand Up @@ -229,5 +230,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
}

func (crsm *ConsumerRelayStateMachine) UpdateBatch(err error) {
utils.LavaFormatDebug("updating batch before")
crsm.batchUpdate <- err
utils.LavaFormatDebug("updating batch after")
}
98 changes: 97 additions & 1 deletion protocol/rpcconsumer/consumer_relay_state_machine_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,103 @@
package rpcconsumer

import "testing"
import (
context "context"
"fmt"
"net/http"
"testing"
"time"

"github.com/lavanet/lava/v3/protocol/chainlib"
"github.com/lavanet/lava/v3/protocol/chainlib/extensionslib"
lavasession "github.com/lavanet/lava/v3/protocol/lavasession"
"github.com/lavanet/lava/v3/protocol/metrics"
spectypes "github.com/lavanet/lava/v3/x/spec/types"
"github.com/stretchr/testify/require"
)

type ConsumerRelaySenderMock struct {
retValue error
}

func (crsm *ConsumerRelaySenderMock) sendRelayToProvider(ctx context.Context, protocolMessage chainlib.ProtocolMessage, relayProcessor *RelayProcessor, analytics *metrics.RelayMetrics) (errRet error) {
return crsm.retValue
}

func (crsm *ConsumerRelaySenderMock) getProcessingTimeout(chainMessage chainlib.ChainMessage) (processingTimeout time.Duration, relayTimeout time.Duration) {
return time.Second * 50000, 100 * time.Millisecond
}

func (crsm *ConsumerRelaySenderMock) GetChainIdAndApiInterface() (string, string) {
return "testUno", "testDos"
}

func TestConsumerStateMachineHappyFlow(t *testing.T) {
t.Run("happy", func(t *testing.T) {
ctx := context.Background()
serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Handle the incoming request and provide the desired response
w.WriteHeader(http.StatusOK)
})
specId := "LAV1"
chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, specId, spectypes.APIInterfaceRest, serverHandler, nil, "../../", nil)
if closeServer != nil {
defer closeServer()
}
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
dappId := "dapp"
consumerIp := "123.11"
protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, dappId, consumerIp)
consistency := NewConsumerConsistency(specId)

relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, consistency, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, &ConsumerRelaySenderMock{retValue: nil}, protocolMessage, nil, false, relayProcessorMetrics))

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
canUse := usedProviders.TryLockSelection(ctx)
require.NoError(t, ctx.Err())
require.Nil(t, canUse)
require.Zero(t, usedProviders.CurrentlyUsed())
require.Zero(t, usedProviders.SessionsLatestBatch())
consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}}

relayTaskChannel := relayProcessor.GetRelayTaskChannel()
taskNumber := 0
for task := range relayTaskChannel {
switch taskNumber {
case 0:
require.False(t, task.IsDone())
usedProviders.AddUsed(consumerSessionsMap, nil)
relayProcessor.UpdateBatch(nil)
sendProtocolError(relayProcessor, "lava@test", time.Millisecond*1, fmt.Errorf("bad"))
case 1:
require.False(t, task.IsDone())
usedProviders.AddUsed(consumerSessionsMap, nil)
relayProcessor.UpdateBatch(nil)
sendNodeError(relayProcessor, "lava2@test", time.Millisecond*1)
case 2:
require.False(t, task.IsDone())
usedProviders.AddUsed(consumerSessionsMap, nil)
relayProcessor.UpdateBatch(nil)
sendNodeError(relayProcessor, "lava2@test", time.Millisecond*1)
case 3:
require.False(t, task.IsDone())
usedProviders.AddUsed(consumerSessionsMap, nil)
relayProcessor.UpdateBatch(nil)
sendSuccessResp(relayProcessor, "lava4@test", time.Millisecond*1)
case 4:
require.True(t, task.IsDone())
require.True(t, relayProcessor.HasRequiredNodeResults())
returnedResult, err := relayProcessor.ProcessingResult()
require.NoError(t, err)
require.Equal(t, string(returnedResult.Reply.Data), "ok")
require.Equal(t, http.StatusOK, returnedResult.StatusCode)
return // end test.
}

taskNumber++
}
})
}

0 comments on commit bb1173f

Please sign in to comment.