Skip to content

Commit

Permalink
Merge branch 'main' into PRT-provider-node-error-retry-mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Sep 1, 2024
2 parents 8fb6e94 + bd46dac commit 811da28
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 167 deletions.
3 changes: 2 additions & 1 deletion protocol/chainlib/base_chain_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ func (apip *BaseChainParser) getApiCollection(connectionType, internalPath, addo

// Return an error if spec does not exist
if !ok {
return nil, utils.LavaFormatWarning("api not supported", common.APINotSupportedError, utils.Attribute{Key: "connectionType", Value: connectionType})
utils.LavaFormatDebug("api not supported", utils.Attribute{Key: "connectionType", Value: connectionType})
return nil, common.APINotSupportedError
}

// Return an error if api is disabled
Expand Down
3 changes: 2 additions & 1 deletion protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {

protocolMessage, err := cwm.relaySender.ParseRelay(webSocketCtx, "", string(msg), cwm.connectionType, dappID, userIp, metricsData, nil)
if err != nil {
formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), utils.LavaFormatError("could not parse message", err), msgSeed, msg, cwm.apiInterface, time.Since(startTime))
utils.LavaFormatDebug("ws manager could not parse message", utils.LogAttr("message", msg), utils.LogAttr("err", err))
formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), err, msgSeed, msg, cwm.apiInterface, time.Since(startTime))
if formatterMsg != nil {
websocketConnWriteChan <- webSocketMsgWithType{messageType: messageType, msg: formatterMsg}
}
Expand Down
5 changes: 5 additions & 0 deletions protocol/chainlib/protocol_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
pairingtypes "github.com/lavanet/lava/v2/x/pairing/types"
)

type UserData struct {
ConsumerIp string
DappId string
}

type BaseProtocolMessage struct {
ChainMessage
directiveHeaders map[string]string
Expand Down
3 changes: 2 additions & 1 deletion protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ func (apip *RestChainParser) getSupportedApi(name, connectionType string) (*ApiC

// Return an error if spec does not exist
if !ok {
return nil, utils.LavaFormatWarning("rest api not supported", common.APINotSupportedError,
utils.LavaFormatDebug("rest api not supported",
utils.LogAttr("name", name),
utils.LogAttr("connectionType", connectionType),
)
return nil, common.APINotSupportedError
}
api := apiCont.api

Expand Down
1 change: 0 additions & 1 deletion protocol/chainlib/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func TestRestGetSupportedApi(t *testing.T) {
_, err = apip.getSupportedApi("API2", connectionType_test)
assert.Error(t, err)
assert.ErrorIs(t, err, common.APINotSupportedError)
assert.Equal(t, "rest api not supported ErrMsg: api not supported {name:API2,connectionType:test}: api not supported", err.Error())

// Test case 3: Returns error if the API is disabled
apip = &RestChainParser{
Expand Down
20 changes: 11 additions & 9 deletions protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,12 +1090,12 @@ func TestSameProviderConflictReport(t *testing.T) {

twoProvidersConflictSent := false
sameProviderConflictSent := false
wg := sync.WaitGroup{}
wg.Add(1)
numberOfRelays := 10
reported := make(chan bool, numberOfRelays)
txConflictDetectionMock := func(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, conflictHandler common.ConflictHandlerInterface) error {
if finalizationConflict == nil {
require.FailNow(t, "Finalization conflict should not be nil")
wg.Done()
reported <- true
return nil
}
utils.LavaFormatDebug("@@@@@@@@@@@@@@@ Called conflict mock tx", utils.LogAttr("provider0", finalizationConflict.RelayFinalization_0.RelaySession.Provider), utils.LogAttr("provider0", finalizationConflict.RelayFinalization_1.RelaySession.Provider))
Expand All @@ -1114,7 +1114,7 @@ func TestSameProviderConflictReport(t *testing.T) {
}

twoProvidersConflictSent = true
wg.Done()
reported <- true
return nil
}
mockConsumerStateTracker.SetTxConflictDetectionWrapper(txConflictDetectionMock)
Expand All @@ -1137,14 +1137,16 @@ func TestSameProviderConflictReport(t *testing.T) {
req, err := http.NewRequest(http.MethodPost, "http://"+consumerListenAddress+"/cosmos/tx/v1beta1/txs", nil)
require.NoError(t, err)

for i := 0; i < 2; i++ {
for i := 0; i < numberOfRelays; i++ {
// Two relays to trigger both same provider conflict and
resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
go func() {
resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
}()
}
// conflict calls happen concurrently, therefore we need to wait the call.
wg.Wait()
<-reported
require.True(t, sameProviderConflictSent)
require.True(t, twoProvidersConflictSent)
})
Expand Down
5 changes: 4 additions & 1 deletion protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,8 @@ func (csm *ConsumerSessionManager) removeAddressFromValidAddresses(address strin
// Blocks a provider making him unavailable for pick this epoch, will also report him as unavailable if reportProvider is set to true.
// Validates that the sessionEpoch is equal to cs.currentEpoch otherwise doesn't take effect.
func (csm *ConsumerSessionManager) blockProvider(address string, reportProvider bool, sessionEpoch uint64, disconnections uint64, errors uint64, allowSecondChance bool, reconnectCallback func() error, errorsForReport []error) error {
utils.LavaFormatDebug("CSM Blocking provider", utils.LogAttr("address", address), utils.LogAttr("errorsForReport", errorsForReport), utils.LogAttr("allowing_second_chance", allowSecondChance))

// find Index of the address
if sessionEpoch != csm.atomicReadCurrentEpoch() { // we read here atomically so cs.currentEpoch cant change in the middle, so we can save time if epochs mismatch
return EpochMismatchError
Expand All @@ -847,10 +849,10 @@ func (csm *ConsumerSessionManager) blockProvider(address string, reportProvider
go func() {
<-time.After(retrySecondChanceAfter)
// check epoch is still relevant, if not just return
utils.LavaFormatDebug("Running second chance for provider", utils.LogAttr("address", address))
if sessionEpoch != csm.atomicReadCurrentEpoch() {
return
}
utils.LavaFormatDebug("Running second chance for provider", utils.LogAttr("address", address))
csm.validateAndReturnBlockedProviderToValidAddressesList(address)
}()
}
Expand Down Expand Up @@ -967,6 +969,7 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu
if err != nil {
return err
}

if !redemptionSession && blockProvider {
publicProviderAddress, pairingEpoch := parentConsumerSessionsWithProvider.getPublicLavaAddressAndPairingEpoch()
err = csm.blockProvider(publicProviderAddress, reportProvider, pairingEpoch, 0, consecutiveErrors, allowSecondChance, nil, errorsForConsumerSession)
Expand Down
Loading

0 comments on commit 811da28

Please sign in to comment.