Skip to content

Commit

Permalink
chore: refactor consumer results into a class (#1653)
Browse files Browse the repository at this point in the history
* feat: PRT - dappid and consumer ip added to protocol message.

* fix test

* feat: PRT - adding relay processor retry options

* rename

* improving the features

* added result manager

* removed user data

* fix lint

* fix test

* merge fix

* risky brola

* fix conflict test.

* fixed all comments

* llinty

* fix lint

* bugberan

* fix comment

---------

Co-authored-by: Ran Mishael <[email protected]>
  • Loading branch information
omerlavanet and ranlavanet authored Aug 29, 2024
1 parent 520a81f commit bd46dac
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 163 deletions.
5 changes: 5 additions & 0 deletions protocol/chainlib/protocol_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
pairingtypes "github.com/lavanet/lava/v2/x/pairing/types"
)

type UserData struct {
ConsumerIp string
DappId string
}

type BaseProtocolMessage struct {
ChainMessage
directiveHeaders map[string]string
Expand Down
20 changes: 11 additions & 9 deletions protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,12 +1090,12 @@ func TestSameProviderConflictReport(t *testing.T) {

twoProvidersConflictSent := false
sameProviderConflictSent := false
wg := sync.WaitGroup{}
wg.Add(1)
numberOfRelays := 10
reported := make(chan bool, numberOfRelays)
txConflictDetectionMock := func(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, conflictHandler common.ConflictHandlerInterface) error {
if finalizationConflict == nil {
require.FailNow(t, "Finalization conflict should not be nil")
wg.Done()
reported <- true
return nil
}
utils.LavaFormatDebug("@@@@@@@@@@@@@@@ Called conflict mock tx", utils.LogAttr("provider0", finalizationConflict.RelayFinalization_0.RelaySession.Provider), utils.LogAttr("provider0", finalizationConflict.RelayFinalization_1.RelaySession.Provider))
Expand All @@ -1114,7 +1114,7 @@ func TestSameProviderConflictReport(t *testing.T) {
}

twoProvidersConflictSent = true
wg.Done()
reported <- true
return nil
}
mockConsumerStateTracker.SetTxConflictDetectionWrapper(txConflictDetectionMock)
Expand All @@ -1137,14 +1137,16 @@ func TestSameProviderConflictReport(t *testing.T) {
req, err := http.NewRequest(http.MethodPost, "http://"+consumerListenAddress+"/cosmos/tx/v1beta1/txs", nil)
require.NoError(t, err)

for i := 0; i < 2; i++ {
for i := 0; i < numberOfRelays; i++ {
// Two relays to trigger both same provider conflict and
resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
go func() {
resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
}()
}
// conflict calls happen concurrently, therefore we need to wait the call.
wg.Wait()
<-reported
require.True(t, sameProviderConflictSent)
require.True(t, twoProvidersConflictSent)
})
Expand Down
5 changes: 4 additions & 1 deletion protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,8 @@ func (csm *ConsumerSessionManager) removeAddressFromValidAddresses(address strin
// Blocks a provider making him unavailable for pick this epoch, will also report him as unavailable if reportProvider is set to true.
// Validates that the sessionEpoch is equal to cs.currentEpoch otherwise doesn't take effect.
func (csm *ConsumerSessionManager) blockProvider(address string, reportProvider bool, sessionEpoch uint64, disconnections uint64, errors uint64, allowSecondChance bool, reconnectCallback func() error, errorsForReport []error) error {
utils.LavaFormatDebug("CSM Blocking provider", utils.LogAttr("address", address), utils.LogAttr("errorsForReport", errorsForReport), utils.LogAttr("allowing_second_chance", allowSecondChance))

// find Index of the address
if sessionEpoch != csm.atomicReadCurrentEpoch() { // we read here atomically so cs.currentEpoch cant change in the middle, so we can save time if epochs mismatch
return EpochMismatchError
Expand All @@ -847,10 +849,10 @@ func (csm *ConsumerSessionManager) blockProvider(address string, reportProvider
go func() {
<-time.After(retrySecondChanceAfter)
// check epoch is still relevant, if not just return
utils.LavaFormatDebug("Running second chance for provider", utils.LogAttr("address", address))
if sessionEpoch != csm.atomicReadCurrentEpoch() {
return
}
utils.LavaFormatDebug("Running second chance for provider", utils.LogAttr("address", address))
csm.validateAndReturnBlockedProviderToValidAddressesList(address)
}()
}
Expand Down Expand Up @@ -967,6 +969,7 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu
if err != nil {
return err
}

if !redemptionSession && blockProvider {
publicProviderAddress, pairingEpoch := parentConsumerSessionsWithProvider.getPublicLavaAddressAndPairingEpoch()
err = csm.blockProvider(publicProviderAddress, reportProvider, pairingEpoch, 0, consecutiveErrors, allowSecondChance, nil, errorsForConsumerSession)
Expand Down
Loading

0 comments on commit bd46dac

Please sign in to comment.