Skip to content

Commit

Permalink
Merge commit 'e4b610f40c4811f1065f5b10cb8975c200e92148' into PRT-v-3-…
Browse files Browse the repository at this point in the history
…2-1-hf-provider-delgation-near-experimental
  • Loading branch information
ranlavanet committed Oct 17, 2024
2 parents 610f94c + e4b610f commit 12bcb94
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 19 deletions.
3 changes: 2 additions & 1 deletion protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,7 @@ func (csm *ConsumerSessionManager) OnSessionDone(
numOfProviders int,
providersCount uint64,
isHangingApi bool,
reduceAvailability bool,
) error {
// release locks, update CU, relaynum etc..
if err := consumerSession.VerifyLock(); err != nil {
Expand All @@ -1040,7 +1041,7 @@ func (csm *ConsumerSessionManager) OnSessionDone(
consumerSession.ConsecutiveErrors = []error{}
consumerSession.LatestBlock = latestServicedBlock // update latest serviced block
// calculate QoS
consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount))
consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount), reduceAvailability)
go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock))
csm.updateMetricsManager(consumerSession, currentLatency, !isHangingApi) // apply latency only for non hanging apis
return nil
Expand Down
22 changes: 11 additions & 11 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestHappyFlow(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -416,7 +416,7 @@ func runOnSessionDoneForConsumerSessionMap(t *testing.T, css ConsumerSessionsMap
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err := csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err := csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestHappyFlowVirtualEpoch(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, maxCuForVirtualEpoch*(virtualEpoch+1))
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch*(virtualEpoch+1), time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch*(virtualEpoch+1), time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, maxCuForVirtualEpoch*(virtualEpoch+1))
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -484,7 +484,7 @@ func TestPairingReset(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -573,7 +573,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) {
require.Equal(t, epoch, csm.currentEpoch)

if rand.Intn(2) > 0 {
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.CuSum, cuForFirstRequest)
require.Equal(t, cs.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -653,7 +653,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) {
for j := numberOfAllowedSessionsPerConsumer / 2; j < numberOfAllowedSessionsPerConsumer; j++ {
cs := sessionList[j].cs
if rand.Intn(2) > 0 {
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, sessionListData[j].cuSum+cuForFirstRequest, cs.CuSum)
require.Equal(t, cs.LatestRelayCu, latestRelayCuAfterDone)
Expand All @@ -676,7 +676,7 @@ func successfulSession(ctx context.Context, csm *ConsumerSessionManager, t *test
for _, cs := range css {
require.NotNil(t, cs)
time.Sleep(time.Duration((rand.Intn(500) + 1)) * time.Millisecond)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
ch <- p
}
Expand Down Expand Up @@ -957,7 +957,7 @@ func TestPairingWithAddons(t *testing.T) {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
}
})
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func TestPairingWithExtensions(t *testing.T) {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, extensionOpt.addon, extensionsList, common.NO_STATE, 0) // get a session
require.NoError(t, err)
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
}
})
Expand Down Expand Up @@ -1068,7 +1068,7 @@ func TestPairingWithStateful(t *testing.T) {
require.NoError(t, err)
require.Equal(t, allProviders, len(css))
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
}
usedProviders := NewUsedProviders(nil)
Expand Down
4 changes: 2 additions & 2 deletions protocol/lavasession/end_to_end_lavasession_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestHappyFlowE2EEmergency(t *testing.T) {
err = psm.OnSessionDone(sps, cs.Session.RelayNum-skippedRelays)
require.NoError(t, err)

err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false, false)
require.NoError(t, err)
}

Expand Down Expand Up @@ -195,7 +195,7 @@ func prepareSessionsWithFirstRelay(t *testing.T, cuForFirstRequest uint64) (*Con
require.NoError(t, err)

// Consumer Side:
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down
8 changes: 5 additions & 3 deletions protocol/lavasession/single_consumer_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ func (cs *SingleConsumerSession) getQosComputedResultOrZero() sdk.Dec {
return sdk.ZeroDec()
}

func (cs *SingleConsumerSession) CalculateQoS(latency, expectedLatency time.Duration, blockHeightDiff int64, numOfProviders int, servicersToCount int64) {
func (cs *SingleConsumerSession) CalculateQoS(latency, expectedLatency time.Duration, blockHeightDiff int64, numOfProviders int, servicersToCount int64, reduceAvailability bool) {
// Add current Session QoS
cs.QoSInfo.TotalRelays++ // increase total relays
cs.QoSInfo.AnsweredRelays++ // increase answered relays
cs.QoSInfo.TotalRelays++ // increase total relays
if !reduceAvailability { // incase we want to reduce availability to this provider due to some reason we skip answered.
cs.QoSInfo.AnsweredRelays++ // increase answered relays
}

if cs.QoSInfo.LastQoSReport == nil {
cs.QoSInfo.LastQoSReport = &pairingtypes.QualityOfServiceReport{}
Expand Down
21 changes: 20 additions & 1 deletion protocol/rpcconsumer/consumer_relay_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/lavanet/lava/v3/protocol/chainlib"
"github.com/lavanet/lava/v3/protocol/chainlib/extensionslib"
common "github.com/lavanet/lava/v3/protocol/common"
lavasession "github.com/lavanet/lava/v3/protocol/lavasession"
"github.com/lavanet/lava/v3/protocol/metrics"
Expand All @@ -26,6 +27,7 @@ type ConsumerRelaySender interface {
sendRelayToProvider(ctx context.Context, protocolMessage chainlib.ProtocolMessage, relayProcessor *RelayProcessor, analytics *metrics.RelayMetrics) (errRet error)
getProcessingTimeout(chainMessage chainlib.ChainMessage) (processingTimeout time.Duration, relayTimeout time.Duration)
GetChainIdAndApiInterface() (string, string)
GetExtensionParser() *extensionslib.ExtensionParser
}

type tickerMetricSetterInf interface {
Expand Down Expand Up @@ -116,14 +118,29 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
go func() {
// A channel to be notified processing was done, true means we have results and can return
gotResults := make(chan bool, 1)
processingTimeout, relayTimeout := crsm.relaySender.getProcessingTimeout(crsm.GetProtocolMessage())
protocolMessage := crsm.GetProtocolMessage()
processingTimeout, relayTimeout := crsm.relaySender.getProcessingTimeout(protocolMessage)
if crsm.debugRelays {
utils.LavaFormatDebug("Relay initiated with the following timeout schedule", utils.LogAttr("processingTimeout", processingTimeout), utils.LogAttr("newRelayTimeout", relayTimeout))
}
// Create the processing timeout prior to entering the method so it wont reset every time
processingCtx, processingCtxCancel := context.WithTimeout(crsm.ctx, processingTimeout)
defer processingCtxCancel()

apiName := protocolMessage.GetApi().Name
resetUsedOnce := true
setArchiveOnSpecialApi := func() {
if apiName == "tx" || apiName == "chunk" {
archiveExtensionArray := []string{"archive"}
protocolMessage.OverrideExtensions(archiveExtensionArray, crsm.relaySender.GetExtensionParser())
protocolMessage.RelayPrivateData().Extensions = archiveExtensionArray
if resetUsedOnce {
resetUsedOnce = false
crsm.usedProviders = lavasession.NewUsedProviders(protocolMessage)
}
}
}

readResultsFromProcessor := func() {
// ProcessResults is reading responses while blocking until the conditions are met
utils.LavaFormatTrace("[StateMachine] Waiting for results", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()))
Expand All @@ -132,6 +149,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
if crsm.parentRelayProcessor.HasRequiredNodeResults() {
gotResults <- true
} else {
setArchiveOnSpecialApi()
gotResults <- false
}
}
Expand Down Expand Up @@ -203,6 +221,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
case <-startNewBatchTicker.C:
// Only trigger another batch for non BestResult relays or if we didn't pass the retry limit.
if crsm.ShouldRetry(crsm.usedProviders.BatchNumber()) {
setArchiveOnSpecialApi()
utils.LavaFormatTrace("[StateMachine] ticker triggered", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()))
relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()}
// Add ticker launch metrics
Expand Down
4 changes: 4 additions & 0 deletions protocol/rpcconsumer/consumer_relay_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type ConsumerRelaySenderMock struct {
tickerValue time.Duration
}

func (crsm *ConsumerRelaySenderMock) GetExtensionParser() *extensionslib.ExtensionParser {
return nil
}

func (crsm *ConsumerRelaySenderMock) sendRelayToProvider(ctx context.Context, protocolMessage chainlib.ProtocolMessage, relayProcessor *RelayProcessor, analytics *metrics.RelayMetrics) (errRet error) {
return crsm.retValue
}
Expand Down
11 changes: 10 additions & 1 deletion protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ func (rpccs *RPCConsumerServer) CancelSubscriptionContext(subscriptionKey string
}
}

func (rpccs *RPCConsumerServer) GetExtensionParser() *extensionslib.ExtensionParser {
return rpccs.chainParser.ExtensionsParser()
}

func (rpccs *RPCConsumerServer) sendRelayToProvider(
ctx context.Context,
protocolMessage chainlib.ProtocolMessage,
Expand Down Expand Up @@ -757,8 +761,13 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider(
)
}

errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(protocolMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(expectedRelayTimeoutForQOS), expectedBH, numOfProviders, pairingAddressesLen, protocolMessage.GetApi().Category.HangingApi) // session done successfully
isNodeError, _ := protocolMessage.CheckResponseError(localRelayResult.Reply.Data, localRelayResult.StatusCode)
reduceAvailability := false
if isNodeError {
// validate nodeError is matching our expectations for reducing availability.
reduceAvailability = strings.Contains(string(localRelayResult.Reply.Data), "The node does not track the shard ID")
}
errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(protocolMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(expectedRelayTimeoutForQOS), expectedBH, numOfProviders, pairingAddressesLen, protocolMessage.GetApi().Category.HangingApi, reduceAvailability) // session done successfully
localRelayResult.IsNodeError = isNodeError
if rpccs.cache.CacheActive() && rpcclient.ValidateStatusCodes(localRelayResult.StatusCode, true) == nil {
// in case the error is a node error we don't want to cache
Expand Down

0 comments on commit 12bcb94

Please sign in to comment.