Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: PRT-v-3-2-1-hf-provider-delgation-near-experimental #1748

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestQoS(t *testing.T) {
currentLatency := time.Millisecond
expectedLatency := time.Millisecond
latestServicedBlock := expectedBH
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1, false)
require.Equal(t, uint64(1), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(1), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(1), singleConsumerSession.QoSInfo.SyncScoreSum)
Expand All @@ -292,7 +292,7 @@ func TestQoS(t *testing.T) {
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)

latestServicedBlock = expectedBH + 1
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1, false)
require.Equal(t, uint64(2), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(2), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(2), singleConsumerSession.QoSInfo.SyncScoreSum)
Expand All @@ -302,7 +302,7 @@ func TestQoS(t *testing.T) {
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)

singleConsumerSession.QoSInfo.TotalRelays++ // this is how we add a failure
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1, false)
require.Equal(t, uint64(3), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(4), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(3), singleConsumerSession.QoSInfo.SyncScoreSum)
Expand All @@ -313,7 +313,7 @@ func TestQoS(t *testing.T) {
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)

latestServicedBlock = expectedBH - 1 // is one block below threshold
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1)
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1, false)
require.Equal(t, uint64(4), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(5), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(3), singleConsumerSession.QoSInfo.SyncScoreSum)
Expand All @@ -325,7 +325,7 @@ func TestQoS(t *testing.T) {
latestServicedBlock = expectedBH + 1
// add in a loop so availability goes above 95%
for i := 5; i < 100; i++ {
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1)
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1, false)
}
require.Equal(t, sdk.MustNewDecFromStr("0.8"), singleConsumerSession.QoSInfo.LastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.MustNewDecFromStr("0.989898989898989898"), singleConsumerSession.QoSInfo.LastQoSReport.Sync)
Expand Down
3 changes: 2 additions & 1 deletion protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,7 @@ func (csm *ConsumerSessionManager) OnSessionDone(
numOfProviders int,
providersCount uint64,
isHangingApi bool,
reduceAvailability bool,
) error {
// release locks, update CU, relaynum etc..
if err := consumerSession.VerifyLock(); err != nil {
Expand All @@ -1040,7 +1041,7 @@ func (csm *ConsumerSessionManager) OnSessionDone(
consumerSession.ConsecutiveErrors = []error{}
consumerSession.LatestBlock = latestServicedBlock // update latest serviced block
// calculate QoS
consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount))
consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount), reduceAvailability)
go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock))
csm.updateMetricsManager(consumerSession, currentLatency, !isHangingApi) // apply latency only for non hanging apis
return nil
Expand Down
22 changes: 11 additions & 11 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestHappyFlow(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -416,7 +416,7 @@ func runOnSessionDoneForConsumerSessionMap(t *testing.T, css ConsumerSessionsMap
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err := csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err := csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestHappyFlowVirtualEpoch(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, maxCuForVirtualEpoch*(virtualEpoch+1))
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch*(virtualEpoch+1), time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch*(virtualEpoch+1), time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, maxCuForVirtualEpoch*(virtualEpoch+1))
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -484,7 +484,7 @@ func TestPairingReset(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -573,7 +573,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) {
require.Equal(t, epoch, csm.currentEpoch)

if rand.Intn(2) > 0 {
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.CuSum, cuForFirstRequest)
require.Equal(t, cs.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -653,7 +653,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) {
for j := numberOfAllowedSessionsPerConsumer / 2; j < numberOfAllowedSessionsPerConsumer; j++ {
cs := sessionList[j].cs
if rand.Intn(2) > 0 {
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, sessionListData[j].cuSum+cuForFirstRequest, cs.CuSum)
require.Equal(t, cs.LatestRelayCu, latestRelayCuAfterDone)
Expand All @@ -676,7 +676,7 @@ func successfulSession(ctx context.Context, csm *ConsumerSessionManager, t *test
for _, cs := range css {
require.NotNil(t, cs)
time.Sleep(time.Duration((rand.Intn(500) + 1)) * time.Millisecond)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
ch <- p
}
Expand Down Expand Up @@ -957,7 +957,7 @@ func TestPairingWithAddons(t *testing.T) {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
}
})
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func TestPairingWithExtensions(t *testing.T) {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, extensionOpt.addon, extensionsList, common.NO_STATE, 0) // get a session
require.NoError(t, err)
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
}
})
Expand Down Expand Up @@ -1068,7 +1068,7 @@ func TestPairingWithStateful(t *testing.T) {
require.NoError(t, err)
require.Equal(t, allProviders, len(css))
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
}
usedProviders := NewUsedProviders(nil)
Expand Down
4 changes: 2 additions & 2 deletions protocol/lavasession/end_to_end_lavasession_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestHappyFlowE2EEmergency(t *testing.T) {
err = psm.OnSessionDone(sps, cs.Session.RelayNum-skippedRelays)
require.NoError(t, err)

err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false, false)
require.NoError(t, err)
}

Expand Down Expand Up @@ -195,7 +195,7 @@ func prepareSessionsWithFirstRelay(t *testing.T, cuForFirstRequest uint64) (*Con
require.NoError(t, err)

// Consumer Side:
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down
8 changes: 5 additions & 3 deletions protocol/lavasession/single_consumer_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ func (cs *SingleConsumerSession) getQosComputedResultOrZero() sdk.Dec {
return sdk.ZeroDec()
}

func (cs *SingleConsumerSession) CalculateQoS(latency, expectedLatency time.Duration, blockHeightDiff int64, numOfProviders int, servicersToCount int64) {
func (cs *SingleConsumerSession) CalculateQoS(latency, expectedLatency time.Duration, blockHeightDiff int64, numOfProviders int, servicersToCount int64, reduceAvailability bool) {
// Add current Session QoS
cs.QoSInfo.TotalRelays++ // increase total relays
cs.QoSInfo.AnsweredRelays++ // increase answered relays
cs.QoSInfo.TotalRelays++ // increase total relays
if !reduceAvailability { // incase we want to reduce availability to this provider due to some reason we skip answered.
cs.QoSInfo.AnsweredRelays++ // increase answered relays
}

if cs.QoSInfo.LastQoSReport == nil {
cs.QoSInfo.LastQoSReport = &pairingtypes.QualityOfServiceReport{}
Expand Down
21 changes: 20 additions & 1 deletion protocol/rpcconsumer/consumer_relay_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/lavanet/lava/v3/protocol/chainlib"
"github.com/lavanet/lava/v3/protocol/chainlib/extensionslib"
common "github.com/lavanet/lava/v3/protocol/common"
lavasession "github.com/lavanet/lava/v3/protocol/lavasession"
"github.com/lavanet/lava/v3/protocol/metrics"
Expand All @@ -26,6 +27,7 @@ type ConsumerRelaySender interface {
sendRelayToProvider(ctx context.Context, protocolMessage chainlib.ProtocolMessage, relayProcessor *RelayProcessor, analytics *metrics.RelayMetrics) (errRet error)
getProcessingTimeout(chainMessage chainlib.ChainMessage) (processingTimeout time.Duration, relayTimeout time.Duration)
GetChainIdAndApiInterface() (string, string)
GetExtensionParser() *extensionslib.ExtensionParser
}

type tickerMetricSetterInf interface {
Expand Down Expand Up @@ -116,14 +118,29 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
go func() {
// A channel to be notified processing was done, true means we have results and can return
gotResults := make(chan bool, 1)
processingTimeout, relayTimeout := crsm.relaySender.getProcessingTimeout(crsm.GetProtocolMessage())
protocolMessage := crsm.GetProtocolMessage()
processingTimeout, relayTimeout := crsm.relaySender.getProcessingTimeout(protocolMessage)
if crsm.debugRelays {
utils.LavaFormatDebug("Relay initiated with the following timeout schedule", utils.LogAttr("processingTimeout", processingTimeout), utils.LogAttr("newRelayTimeout", relayTimeout))
}
// Create the processing timeout prior to entering the method so it wont reset every time
processingCtx, processingCtxCancel := context.WithTimeout(crsm.ctx, processingTimeout)
defer processingCtxCancel()

apiName := protocolMessage.GetApi().Name
resetUsedOnce := true
setArchiveOnSpecialApi := func() {
if apiName == "tx" || apiName == "chunk" {
archiveExtensionArray := []string{"archive"}
protocolMessage.OverrideExtensions(archiveExtensionArray, crsm.relaySender.GetExtensionParser())
protocolMessage.RelayPrivateData().Extensions = archiveExtensionArray
if resetUsedOnce {
resetUsedOnce = false
crsm.usedProviders = lavasession.NewUsedProviders(protocolMessage)
}
}
}

readResultsFromProcessor := func() {
// ProcessResults is reading responses while blocking until the conditions are met
utils.LavaFormatTrace("[StateMachine] Waiting for results", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()))
Expand All @@ -132,6 +149,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
if crsm.parentRelayProcessor.HasRequiredNodeResults() {
gotResults <- true
} else {
setArchiveOnSpecialApi()
gotResults <- false
}
}
Expand Down Expand Up @@ -203,6 +221,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
case <-startNewBatchTicker.C:
// Only trigger another batch for non BestResult relays or if we didn't pass the retry limit.
if crsm.ShouldRetry(crsm.usedProviders.BatchNumber()) {
setArchiveOnSpecialApi()
utils.LavaFormatTrace("[StateMachine] ticker triggered", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()))
relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()}
// Add ticker launch metrics
Expand Down
4 changes: 4 additions & 0 deletions protocol/rpcconsumer/consumer_relay_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type ConsumerRelaySenderMock struct {
tickerValue time.Duration
}

func (crsm *ConsumerRelaySenderMock) GetExtensionParser() *extensionslib.ExtensionParser {
return nil
}

func (crsm *ConsumerRelaySenderMock) sendRelayToProvider(ctx context.Context, protocolMessage chainlib.ProtocolMessage, relayProcessor *RelayProcessor, analytics *metrics.RelayMetrics) (errRet error) {
return crsm.retValue
}
Expand Down
Loading
Loading