Skip to content

Commit

Permalink
fix: PRT - fix race condition on state machine stateful state
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Sep 9, 2024
1 parent 0c17bba commit 2f9e9e0
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions protocol/rpcconsumer/consumer_relay_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend

readResultsFromProcessor := func() {
// ProcessResults is reading responses while blocking until the conditions are met
utils.LavaFormatTrace("[StateMachine] Waiting for results", utils.LogAttr("batch", batchNumber))
crsm.parentRelayProcessor.WaitForResults(processingCtx)
// Decide if we need to resend or not
if crsm.parentRelayProcessor.HasRequiredNodeResults() {
Expand All @@ -139,15 +140,11 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
returnCondition := make(chan error, 1)
// Used for checking whether to return an error to the user or to allow other channels return their result first see detailed description on the switch case below
validateReturnCondition := func(err error) {
currentlyUsedIsEmptyCounter := 0
for validateNoProvidersAreUsed := 0; validateNoProvidersAreUsed < numberOfTimesToCheckCurrentlyUsedIsEmpty; validateNoProvidersAreUsed++ {
if crsm.usedProviders.CurrentlyUsed() == 0 {
currentlyUsedIsEmptyCounter++
}
time.Sleep(5 * time.Millisecond)
}
// we failed to send a batch of relays, if there are no active sends we can terminate after validating X amount of times to make sure no racing channels
if currentlyUsedIsEmptyCounter >= numberOfTimesToCheckCurrentlyUsedIsEmpty {
batchOnStart := batchNumber
time.Sleep(15 * time.Millisecond)
utils.LavaFormatTrace("[StateMachine] validating return condition", utils.LogAttr("batch", batchNumber))
if batchOnStart == crsm.usedProviders.BatchNumber() && crsm.usedProviders.CurrentlyUsed() == 0 {
utils.LavaFormatTrace("[StateMachine] return condition triggered", utils.LogAttr("batch", batchNumber), utils.LogAttr("err", err))
returnCondition <- err
}
}
Expand All @@ -162,13 +159,13 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
startNewBatchTicker := time.NewTicker(relayTimeout) // Every relay timeout we send a new batch
defer startNewBatchTicker.Stop()
consecutiveBatchErrors := 0

// Start the relay state machine
for {
select {
// Getting batch update for either errors sending message or successful batches
case err := <-crsm.batchUpdate:
if err != nil { // Error handling
utils.LavaFormatTrace("[StateMachine] err := <-crsm.batchUpdate", utils.LogAttr("err", err), utils.LogAttr("batch", batchNumber), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors))
// Sending a new batch failed (consumer's protocol side), handling the state machine
consecutiveBatchErrors++ // Increase consecutive error counter
if consecutiveBatchErrors > SendRelayAttempts { // If we failed sending a message more than "SendRelayAttempts" time in a row.
Expand All @@ -177,6 +174,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
}
go validateReturnCondition(err) // Check if we have ongoing messages pending return.
} else {
utils.LavaFormatTrace("[StateMachine] batchUpdate - err != nil - batch fail retry attempt", utils.LogAttr("batch", batchNumber), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors))
// Failed sending message, but we still want to attempt sending more.
relayTaskChannel <- RelayStateSendInstructions{
protocolMessage: crsm.GetProtocolMessage(),
Expand All @@ -198,6 +196,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
return
}
case success := <-gotResults:
utils.LavaFormatTrace("[StateMachine] success := <-gotResults", utils.LogAttr("batch", batchNumber))
// If we had a successful result return what we currently have
// Or we are done sending relays, and we have no other relays pending results.
if success { // Check wether we can return the valid results or we need to send another relay
Expand All @@ -206,6 +205,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
}
// If should retry == true, send a new batch. (success == false)
if crsm.ShouldRetry(batchNumber) {
utils.LavaFormatTrace("[StateMachine] success := <-gotResults - crsm.ShouldRetry(batchNumber)", utils.LogAttr("batch", batchNumber))
relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()}
} else {
go validateReturnCondition(nil)
Expand All @@ -214,11 +214,13 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
case <-startNewBatchTicker.C:
// Only trigger another batch for non BestResult relays or if we didn't pass the retry limit.
if crsm.ShouldRetry(batchNumber) {
utils.LavaFormatTrace("[StateMachine] ticker triggered", utils.LogAttr("batch", batchNumber))
relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()}
// Add ticker launch metrics
go crsm.tickerMetricSetter.SetRelaySentByNewBatchTickerMetric(crsm.relaySender.GetChainIdAndApiInterface())
}
case returnErr := <-returnCondition:
utils.LavaFormatTrace("[StateMachine] returnErr := <-returnCondition", utils.LogAttr("batch", batchNumber))
// we use this channel because there could be a race condition between us releasing the provider and about to send the return
// to an error happening on another relay processor's routine. this can cause an error that returns to the user
// if we don't release the case, it will cause the success case condition to not be executed
Expand Down

0 comments on commit 2f9e9e0

Please sign in to comment.