Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor consumer results into a class #1653

Merged
merged 23 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions protocol/chainlib/protocol_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
pairingtypes "github.com/lavanet/lava/v2/x/pairing/types"
)

type UserData struct {
ConsumerIp string
DappId string
}

type BaseProtocolMessage struct {
ChainMessage
directiveHeaders map[string]string
Expand Down
20 changes: 11 additions & 9 deletions protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,12 +1090,12 @@ func TestSameProviderConflictReport(t *testing.T) {

twoProvidersConflictSent := false
sameProviderConflictSent := false
wg := sync.WaitGroup{}
wg.Add(1)
numberOfRelays := 10
reported := make(chan bool, numberOfRelays)
txConflictDetectionMock := func(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, conflictHandler common.ConflictHandlerInterface) error {
if finalizationConflict == nil {
require.FailNow(t, "Finalization conflict should not be nil")
wg.Done()
reported <- true
return nil
}
utils.LavaFormatDebug("@@@@@@@@@@@@@@@ Called conflict mock tx", utils.LogAttr("provider0", finalizationConflict.RelayFinalization_0.RelaySession.Provider), utils.LogAttr("provider0", finalizationConflict.RelayFinalization_1.RelaySession.Provider))
Expand All @@ -1114,7 +1114,7 @@ func TestSameProviderConflictReport(t *testing.T) {
}

twoProvidersConflictSent = true
wg.Done()
reported <- true
return nil
}
mockConsumerStateTracker.SetTxConflictDetectionWrapper(txConflictDetectionMock)
Expand All @@ -1137,14 +1137,16 @@ func TestSameProviderConflictReport(t *testing.T) {
req, err := http.NewRequest(http.MethodPost, "http://"+consumerListenAddress+"/cosmos/tx/v1beta1/txs", nil)
require.NoError(t, err)

for i := 0; i < 2; i++ {
for i := 0; i < numberOfRelays; i++ {
// Two relays to trigger both same provider conflict and
resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
go func() {
resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
}()
}
// conflict calls happen concurrently, therefore we need to wait the call.
wg.Wait()
<-reported
require.True(t, sameProviderConflictSent)
require.True(t, twoProvidersConflictSent)
})
Expand Down
5 changes: 4 additions & 1 deletion protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,8 @@ func (csm *ConsumerSessionManager) removeAddressFromValidAddresses(address strin
// Blocks a provider making him unavailable for pick this epoch, will also report him as unavailable if reportProvider is set to true.
// Validates that the sessionEpoch is equal to cs.currentEpoch otherwise doesn't take effect.
func (csm *ConsumerSessionManager) blockProvider(address string, reportProvider bool, sessionEpoch uint64, disconnections uint64, errors uint64, allowSecondChance bool, reconnectCallback func() error, errorsForReport []error) error {
utils.LavaFormatDebug("CSM Blocking provider", utils.LogAttr("address", address), utils.LogAttr("errorsForReport", errorsForReport), utils.LogAttr("allowing_second_chance", allowSecondChance))

// find Index of the address
if sessionEpoch != csm.atomicReadCurrentEpoch() { // we read here atomically so cs.currentEpoch cant change in the middle, so we can save time if epochs mismatch
return EpochMismatchError
Expand All @@ -847,10 +849,10 @@ func (csm *ConsumerSessionManager) blockProvider(address string, reportProvider
go func() {
<-time.After(retrySecondChanceAfter)
// check epoch is still relevant, if not just return
utils.LavaFormatDebug("Running second chance for provider", utils.LogAttr("address", address))
if sessionEpoch != csm.atomicReadCurrentEpoch() {
return
}
utils.LavaFormatDebug("Running second chance for provider", utils.LogAttr("address", address))
csm.validateAndReturnBlockedProviderToValidAddressesList(address)
}()
}
Expand Down Expand Up @@ -967,6 +969,7 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu
if err != nil {
return err
}

if !redemptionSession && blockProvider {
publicProviderAddress, pairingEpoch := parentConsumerSessionsWithProvider.getPublicLavaAddressAndPairingEpoch()
err = csm.blockProvider(publicProviderAddress, reportProvider, pairingEpoch, 0, consecutiveErrors, allowSecondChance, nil, errorsForConsumerSession)
Expand Down
Loading
Loading