Skip to content

Commit

Permalink
Merge branch 'add-optimizer-tiers' of github.com:lavanet/lava into ad…
Browse files Browse the repository at this point in the history
…d-optimizer-tiers
  • Loading branch information
omerlavanet committed Sep 11, 2024
2 parents 2b116a2 + 41a11f1 commit ba38a7c
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 31 deletions.
5 changes: 3 additions & 2 deletions protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/lavanet/lava/v3/protocol/common"
"github.com/lavanet/lava/v3/protocol/metrics"
"github.com/lavanet/lava/v3/utils"
"github.com/lavanet/lava/v3/utils/rand"
spectypes "github.com/lavanet/lava/v3/x/spec/types"
)

Expand Down Expand Up @@ -85,6 +86,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {

webSocketCtx, cancelWebSocketCtx := context.WithCancel(context.Background())
guid := utils.GenerateUniqueIdentifier()
guidString := strconv.FormatUint(guid, 10)
webSocketCtx = utils.WithUniqueIdentifier(webSocketCtx, guid)
utils.LavaFormatDebug("consumer websocket manager started", utils.LogAttr("GUID", webSocketCtx))
defer func() {
Expand All @@ -110,7 +112,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {

for {
startTime := time.Now()
msgSeed := logger.GetMessageSeed()
msgSeed := guidString + "_" + strconv.Itoa(rand.Intn(10000000000)) // use message seed with original guid and new int

utils.LavaFormatTrace("listening for new message from the websocket")

Expand All @@ -132,7 +134,6 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
}
}

msgSeed = strconv.FormatUint(guid, 10)
userIp := websocketConn.RemoteAddr().String()

logFormattedMsg := string(msg)
Expand Down
76 changes: 58 additions & 18 deletions protocol/chainlib/node_error_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"fmt"
"io"
"net"
"os"
"net/url"
"regexp"
"strings"
"syscall"

"github.com/goccy/go-json"

Expand All @@ -23,24 +23,64 @@ import (
type genericErrorHandler struct{}

func (geh *genericErrorHandler) handleConnectionError(err error) error {
if err == net.ErrWriteToConnected {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: Write to connected connection", nil)
} else if err == net.ErrClosed {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: Operation on closed connection", nil)
} else if err == io.EOF {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: End of input stream reached", nil)
} else if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: Network operation timed out", nil)
} else if _, ok := err.(*net.DNSError); ok {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: DNS resolution failed", nil)
} else if opErr, ok := err.(*net.OpError); ok {
if sysErr, ok := opErr.Err.(*os.SyscallError); ok && sysErr.Err == syscall.ECONNREFUSED {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: Connection refused", nil)
// Generic error message
genericMsg := "Provider Side Failed Sending Message"

switch {
case err == net.ErrWriteToConnected:
return utils.LavaFormatProduction(genericMsg+", Reason: Write to connected connection", nil)
case err == net.ErrClosed:
return utils.LavaFormatProduction(genericMsg+", Reason: Operation on closed connection", nil)
case err == io.EOF:
return utils.LavaFormatProduction(genericMsg+", Reason: End of input stream reached", nil)
case strings.Contains(err.Error(), "http: server gave HTTP response to HTTPS client"):
return utils.LavaFormatProduction(genericMsg+", Reason: misconfigured http endpoint as https", nil)
}

if opErr, ok := err.(*net.OpError); ok {
switch {
case opErr.Timeout():
return utils.LavaFormatProduction(genericMsg+", Reason: Network operation timed out", nil)
case strings.Contains(opErr.Error(), "connection refused"):
return utils.LavaFormatProduction(genericMsg+", Reason: Connection refused", nil)
default:
// Handle other OpError cases without exposing specific details
return utils.LavaFormatProduction(genericMsg+", Reason: Network operation error", nil)
}
}
if urlErr, ok := err.(*url.Error); ok {
switch {
case urlErr.Timeout():
return utils.LavaFormatProduction(genericMsg+", Reason: url.Error issue", nil)
case strings.Contains(urlErr.Error(), "connection refused"):
return utils.LavaFormatProduction(genericMsg+", Reason: Connection refused", nil)
}
} else if strings.Contains(err.Error(), "http: server gave HTTP response to HTTPS client") {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: misconfigured http endpoint as https", nil)
}
return nil // do not return here so the caller will return the error inside the data so it reaches the user when it doesn't match any specific cases

if _, ok := err.(*net.DNSError); ok {
return utils.LavaFormatProduction(genericMsg+", Reason: DNS resolution failed", nil)
}

// Mask IP addresses and potential secrets in the error message, and check if any secret was found
maskedError, foundSecret := maskSensitiveInfo(err.Error())
if foundSecret {
// Log or handle the case when a secret was found, if necessary
utils.LavaFormatProduction(genericMsg+maskedError, nil)
}
return nil
}

func maskSensitiveInfo(errMsg string) (string, bool) {
foundSecret := false

// Mask IP addresses
ipRegex := regexp.MustCompile(`\b(?:\d{1,3}\.){3}\d{1,3}\b`)
if ipRegex.MatchString(errMsg) {
foundSecret = true
errMsg = ipRegex.ReplaceAllString(errMsg, "[IP_ADDRESS]")
}

return errMsg, foundSecret
}

func (geh *genericErrorHandler) handleGenericErrors(ctx context.Context, nodeError error) error {
Expand Down
22 changes: 22 additions & 0 deletions protocol/chainlib/node_error_handler_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package chainlib

import (
"bytes"
"context"
"errors"
"io"
"net"
"net/http"
"os"
"syscall"
"testing"
"time"

"github.com/lavanet/lava/v3/utils"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -78,3 +81,22 @@ func TestNodeErrorHandlerGenericErrors(t *testing.T) {
err = neh.handleGenericErrors(ctx, errors.New("dummy error"))
require.Equal(t, err, nil)
}

func TestNodeErrorHandlerTimeout(t *testing.T) {
httpClient := &http.Client{
Timeout: 5 * time.Minute, // we are doing a timeout by request
}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
msgBuffer := bytes.NewBuffer([]byte{1, 2, 3})
req, err := http.NewRequestWithContext(ctx, "test", "http://0.0.0.0:6789", msgBuffer)
require.NoError(t, err)
_, err = httpClient.Do(req)
require.Error(t, err)
utils.LavaFormatDebug(err.Error())
genericHandler := genericErrorHandler{}
bctx := context.Background()
ret := genericHandler.handleGenericErrors(bctx, err)
utils.LavaFormatDebug(ret.Error())
require.NotContains(t, ret.Error(), "http://0.0.0.0:6789")
}
126 changes: 126 additions & 0 deletions protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,132 @@ func TestSameProviderConflictBasicResponseCheck(t *testing.T) {
}
}

func TestArchiveProvidersRetry(t *testing.T) {
playbook := []struct {
name string
numOfProviders int
archiveProviders int
nodeErrorProviders int
expectedResult string
statusCode int
}{
{
name: "happy flow",
numOfProviders: 3,
archiveProviders: 3,
nodeErrorProviders: 0,
expectedResult: `{"result": "success"}`,
statusCode: 200,
},
{
name: "archive with 1 errored provider",
numOfProviders: 3,
archiveProviders: 3,
nodeErrorProviders: 1,
expectedResult: `{"result": "success"}`,
statusCode: 200,
},
{
name: "archive with 2 errored provider",
numOfProviders: 3,
archiveProviders: 3,
nodeErrorProviders: 2,
expectedResult: `{"result": "success"}`,
statusCode: 200,
},
{
name: "archive with 3 errored provider",
numOfProviders: 3,
archiveProviders: 3,
nodeErrorProviders: 3,
expectedResult: `{"error": "failure", "message": "test", "code": "-32132"}`,
statusCode: 555,
},
}
for _, play := range playbook {
t.Run(play.name, func(t *testing.T) {
ctx := context.Background()
// can be any spec and api interface
specId := "LAV1"
apiInterface := spectypes.APIInterfaceRest
epoch := uint64(100)
requiredResponses := 1
lavaChainID := "lava"
numProviders := play.numOfProviders

consumerListenAddress := addressGen.GetAddress()
pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{}

type providerData struct {
account sigs.Account
endpoint *lavasession.RPCProviderEndpoint
server *rpcprovider.RPCProviderServer
replySetter *ReplySetter
mockChainFetcher *MockChainFetcher
mockReliabilityManager *MockReliabilityManager
}
providers := []providerData{}

for i := 0; i < numProviders; i++ {
account := sigs.GenerateDeterministicFloatingKey(randomizer)
providerDataI := providerData{account: account}
providers = append(providers, providerDataI)
}
consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer)

for i := 0; i < numProviders; i++ {
ctx := context.Background()
providerDataI := providers[i]
listenAddress := addressGen.GetAddress()
addons := []string(nil)
if i+1 <= play.archiveProviders {
addons = []string{"archive"}
}
providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, consumerAccount.Addr.String(), specId, apiInterface, listenAddress, providerDataI.account, lavaChainID, addons, fmt.Sprintf("provider%d", i))
providers[i].replySetter.replyDataBuf = []byte(`{"result": "success"}`)
if i+1 <= play.nodeErrorProviders {
providers[i].replySetter.replyDataBuf = []byte(`{"error": "failure", "message": "test", "code": "-32132"}`)
providers[i].replySetter.status = 555
}
}

for i := 0; i < numProviders; i++ {
pairingList[uint64(i)] = &lavasession.ConsumerSessionsWithProvider{
PublicLavaAddress: providers[i].account.Addr.String(),
Endpoints: []*lavasession.Endpoint{
{
NetworkAddress: providers[i].endpoint.NetworkAddress.Address,
Enabled: true,
Geolocation: 1,
},
},
Sessions: map[int64]*lavasession.SingleConsumerSession{},
MaxComputeUnits: 10000,
UsedComputeUnits: 0,
PairingEpoch: epoch,
}
}
rpcconsumerServer, _ := createRpcConsumer(t, ctx, specId, apiInterface, consumerAccount, consumerListenAddress, epoch, pairingList, requiredResponses, lavaChainID)
require.NotNil(t, rpcconsumerServer)

client := http.Client{Timeout: 1000 * time.Millisecond}
req, err := http.NewRequest(http.MethodGet, "http://"+consumerListenAddress+"/lavanet/lava/conflict/params", nil)
req.Header["lava-extension"] = []string{"archive"}
require.NoError(t, err)

resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, play.statusCode, resp.StatusCode)

bodyBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

resp.Body.Close()
require.Equal(t, string(bodyBytes), play.expectedResult)
})
}
}

func TestSameProviderConflictReport(t *testing.T) {
type providerData struct {
account sigs.Account
Expand Down
22 changes: 12 additions & 10 deletions protocol/rpcconsumer/consumer_relay_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend

readResultsFromProcessor := func() {
// ProcessResults is reading responses while blocking until the conditions are met
utils.LavaFormatTrace("[StateMachine] Waiting for results", utils.LogAttr("batch", batchNumber))
crsm.parentRelayProcessor.WaitForResults(processingCtx)
// Decide if we need to resend or not
if crsm.parentRelayProcessor.HasRequiredNodeResults() {
Expand All @@ -139,15 +140,11 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
returnCondition := make(chan error, 1)
// Used for checking whether to return an error to the user or to allow other channels return their result first see detailed description on the switch case below
validateReturnCondition := func(err error) {
currentlyUsedIsEmptyCounter := 0
for validateNoProvidersAreUsed := 0; validateNoProvidersAreUsed < numberOfTimesToCheckCurrentlyUsedIsEmpty; validateNoProvidersAreUsed++ {
if crsm.usedProviders.CurrentlyUsed() == 0 {
currentlyUsedIsEmptyCounter++
}
time.Sleep(5 * time.Millisecond)
}
// we failed to send a batch of relays, if there are no active sends we can terminate after validating X amount of times to make sure no racing channels
if currentlyUsedIsEmptyCounter >= numberOfTimesToCheckCurrentlyUsedIsEmpty {
batchOnStart := batchNumber
time.Sleep(15 * time.Millisecond)
utils.LavaFormatTrace("[StateMachine] validating return condition", utils.LogAttr("batch", batchNumber))
if batchOnStart == crsm.usedProviders.BatchNumber() && crsm.usedProviders.CurrentlyUsed() == 0 {
utils.LavaFormatTrace("[StateMachine] return condition triggered", utils.LogAttr("batch", batchNumber), utils.LogAttr("err", err))
returnCondition <- err
}
}
Expand All @@ -162,13 +159,13 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
startNewBatchTicker := time.NewTicker(relayTimeout) // Every relay timeout we send a new batch
defer startNewBatchTicker.Stop()
consecutiveBatchErrors := 0

// Start the relay state machine
for {
select {
// Getting batch update for either errors sending message or successful batches
case err := <-crsm.batchUpdate:
if err != nil { // Error handling
utils.LavaFormatTrace("[StateMachine] err := <-crsm.batchUpdate", utils.LogAttr("err", err), utils.LogAttr("batch", batchNumber), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors))
// Sending a new batch failed (consumer's protocol side), handling the state machine
consecutiveBatchErrors++ // Increase consecutive error counter
if consecutiveBatchErrors > SendRelayAttempts { // If we failed sending a message more than "SendRelayAttempts" time in a row.
Expand All @@ -177,6 +174,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
}
go validateReturnCondition(err) // Check if we have ongoing messages pending return.
} else {
utils.LavaFormatTrace("[StateMachine] batchUpdate - err != nil - batch fail retry attempt", utils.LogAttr("batch", batchNumber), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors))
// Failed sending message, but we still want to attempt sending more.
relayTaskChannel <- RelayStateSendInstructions{
protocolMessage: crsm.GetProtocolMessage(),
Expand All @@ -198,6 +196,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
return
}
case success := <-gotResults:
utils.LavaFormatTrace("[StateMachine] success := <-gotResults", utils.LogAttr("batch", batchNumber))
// If we had a successful result return what we currently have
// Or we are done sending relays, and we have no other relays pending results.
if success { // Check wether we can return the valid results or we need to send another relay
Expand All @@ -206,6 +205,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
}
// If should retry == true, send a new batch. (success == false)
if crsm.ShouldRetry(batchNumber) {
utils.LavaFormatTrace("[StateMachine] success := <-gotResults - crsm.ShouldRetry(batchNumber)", utils.LogAttr("batch", batchNumber))
relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()}
} else {
go validateReturnCondition(nil)
Expand All @@ -214,11 +214,13 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
case <-startNewBatchTicker.C:
// Only trigger another batch for non BestResult relays or if we didn't pass the retry limit.
if crsm.ShouldRetry(batchNumber) {
utils.LavaFormatTrace("[StateMachine] ticker triggered", utils.LogAttr("batch", batchNumber))
relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()}
// Add ticker launch metrics
go crsm.tickerMetricSetter.SetRelaySentByNewBatchTickerMetric(crsm.relaySender.GetChainIdAndApiInterface())
}
case returnErr := <-returnCondition:
utils.LavaFormatTrace("[StateMachine] returnErr := <-returnCondition", utils.LogAttr("batch", batchNumber))
// we use this channel because there could be a race condition between us releasing the provider and about to send the return
// to an error happening on another relay processor's routine. this can cause an error that returns to the user
// if we don't release the case, it will cause the success case condition to not be executed
Expand Down
Loading

0 comments on commit ba38a7c

Please sign in to comment.