Skip to content

Commit

Permalink
Merge branch 'main' into PRT-add-cache-to-protocol-e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
shleikes authored Oct 9, 2024
2 parents 7650b52 + 62e988f commit 329fea7
Show file tree
Hide file tree
Showing 25 changed files with 707 additions and 319 deletions.
37 changes: 37 additions & 0 deletions cookbook/specs/evmos.json
Original file line number Diff line number Diff line change
Expand Up @@ -1590,6 +1590,43 @@
"type": "POST",
"add_on": "debug"
},
"apis": [
{
"name": "debug_getBadBlocks",
"compute_units": 1,
"enabled": false
},
{
"name": "debug_getRawBlock",
"compute_units": 1,
"enabled": false
},
{
"name": "debug_getRawHeader",
"compute_units": 1,
"enabled": false
},
{
"name": "debug_getRawReceipts",
"compute_units": 1,
"enabled": false
},
{
"name": "debug_getRawTransaction",
"compute_units": 1,
"enabled": false
},
{
"name": "debug_storageRangeAt",
"compute_units": 1,
"enabled": false
},
{
"name": "debug_traceCall",
"compute_units": 1,
"enabled": false
}
],
"verifications": [
{
"name": "enabled",
Expand Down
1 change: 1 addition & 0 deletions proto/lavanet/lava/spec/api_collection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ message ParseDirective {
string function_template = 2;
BlockParser result_parsing = 3 [(gogoproto.nullable) = false];
string api_name = 4;
repeated GenericParser parsers = 5 [(gogoproto.nullable) = false];
}

message BlockParser {
Expand Down
112 changes: 58 additions & 54 deletions protocol/chainlib/chain_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,68 +188,71 @@ func (cf *ChainFetcher) Verify(ctx context.Context, verification VerificationCon
)
}

parsedResult, err := parser.ParseFromReply(parserInput, parsing.ResultParsing)
if err != nil {
return utils.LavaFormatWarning("[-] verify failed to parse result", err, []utils.Attribute{
{Key: "chainId", Value: chainId},
{Key: "nodeUrl", Value: proxyUrl.Url},
{Key: "Method", Value: parsing.GetApiName()},
{Key: "Response", Value: string(reply.RelayReply.Data)},
}...)
parsedInput := parser.ParseBlockFromReply(parserInput, parsing.ResultParsing, parsing.Parsers)
if parsedInput.GetRawParsedData() == "" {
return utils.LavaFormatWarning("[-] verify failed to parse result", err,
utils.LogAttr("chainId", chainId),
utils.LogAttr("nodeUrl", proxyUrl.Url),
utils.LogAttr("Method", parsing.GetApiName()),
utils.LogAttr("Response", string(reply.RelayReply.Data)),
)
}
if verification.LatestDistance != 0 && latestBlock != 0 && verification.ParseDirective.FunctionTag != spectypes.FUNCTION_TAG_GET_BLOCK_BY_NUM {
parsedResultAsNumber, err := strconv.ParseUint(parsedResult, 0, 64)
if err != nil {
return utils.LavaFormatWarning("[-] verify failed to parse result as number", err, []utils.Attribute{
{Key: "chainId", Value: chainId},
{Key: "nodeUrl", Value: proxyUrl.Url},
{Key: "Method", Value: parsing.GetApiName()},
{Key: "Response", Value: string(reply.RelayReply.Data)},
{Key: "parsedResult", Value: parsedResult},
}...)
parsedResultAsNumber := parsedInput.GetBlock()
if parsedResultAsNumber == spectypes.NOT_APPLICABLE {
return utils.LavaFormatWarning("[-] verify failed to parse result as number", err,
utils.LogAttr("chainId", chainId),
utils.LogAttr("nodeUrl", proxyUrl.Url),
utils.LogAttr("Method", parsing.GetApiName()),
utils.LogAttr("Response", string(reply.RelayReply.Data)),
utils.LogAttr("rawParsedData", parsedInput.GetRawParsedData()),
)
}
if parsedResultAsNumber > latestBlock {
return utils.LavaFormatWarning("[-] verify failed parsed result is greater than latestBlock", err, []utils.Attribute{
{Key: "chainId", Value: chainId},
{Key: "nodeUrl", Value: proxyUrl.Url},
{Key: "Method", Value: parsing.GetApiName()},
{Key: "latestBlock", Value: latestBlock},
{Key: "parsedResult", Value: parsedResultAsNumber},
}...)
uint64ParsedResultAsNumber := uint64(parsedResultAsNumber)
if uint64ParsedResultAsNumber > latestBlock {
return utils.LavaFormatWarning("[-] verify failed parsed result is greater than latestBlock", err,
utils.LogAttr("chainId", chainId),
utils.LogAttr("nodeUrl", proxyUrl.Url),
utils.LogAttr("Method", parsing.GetApiName()),
utils.LogAttr("latestBlock", latestBlock),
utils.LogAttr("parsedResult", uint64ParsedResultAsNumber),
)
}
if latestBlock-parsedResultAsNumber < verification.LatestDistance {
return utils.LavaFormatWarning("[-] verify failed expected block distance is not sufficient", err, []utils.Attribute{
{Key: "chainId", Value: chainId},
{Key: "nodeUrl", Value: proxyUrl.Url},
{Key: "Method", Value: parsing.GetApiName()},
{Key: "latestBlock", Value: latestBlock},
{Key: "parsedResult", Value: parsedResultAsNumber},
{Key: "expected", Value: verification.LatestDistance},
}...)
if latestBlock-uint64ParsedResultAsNumber < verification.LatestDistance {
return utils.LavaFormatWarning("[-] verify failed expected block distance is not sufficient", err,
utils.LogAttr("chainId", chainId),
utils.LogAttr("nodeUrl", proxyUrl.Url),
utils.LogAttr("Method", parsing.GetApiName()),
utils.LogAttr("latestBlock", latestBlock),
utils.LogAttr("parsedResult", uint64ParsedResultAsNumber),
utils.LogAttr("expected", verification.LatestDistance),
)
}
}
// some verifications only want the response to be valid, and don't care about the value
if verification.Value != "*" && verification.Value != "" && verification.ParseDirective.FunctionTag != spectypes.FUNCTION_TAG_GET_BLOCK_BY_NUM {
if parsedResult != verification.Value {
return utils.LavaFormatWarning("[-] verify failed expected and received are different", err, []utils.Attribute{
{Key: "chainId", Value: chainId},
{Key: "nodeUrl", Value: proxyUrl.Url},
{Key: "parsedResult", Value: parsedResult},
{Key: "verification.Value", Value: verification.Value},
{Key: "Method", Value: parsing.GetApiName()},
{Key: "Extension", Value: verification.Extension},
{Key: "Addon", Value: verification.Addon},
{Key: "Verification", Value: verification.Name},
}...)
rawData := parsedInput.GetRawParsedData()
if rawData != verification.Value {
return utils.LavaFormatWarning("[-] verify failed expected and received are different", err,
utils.LogAttr("chainId", chainId),
utils.LogAttr("nodeUrl", proxyUrl.Url),
utils.LogAttr("rawParsedBlock", rawData),
utils.LogAttr("verification.Value", verification.Value),
utils.LogAttr("Method", parsing.GetApiName()),
utils.LogAttr("Extension", verification.Extension),
utils.LogAttr("Addon", verification.Addon),
utils.LogAttr("Verification", verification.Name),
)
}
}
utils.LavaFormatInfo("[+] verified successfully",
utils.Attribute{Key: "chainId", Value: chainId},
utils.Attribute{Key: "nodeUrl", Value: proxyUrl.Url},
utils.Attribute{Key: "verification", Value: verification.Name},
utils.Attribute{Key: "value", Value: parser.CapStringLen(parsedResult)},
utils.Attribute{Key: "verificationKey", Value: verification.VerificationKey},
utils.Attribute{Key: "apiInterface", Value: cf.endpoint.ApiInterface},
utils.LogAttr("chainId", chainId),
utils.LogAttr("nodeUrl", proxyUrl.Url),
utils.LogAttr("verification", verification.Name),
utils.LogAttr("block", parsedInput.GetBlock()),
utils.LogAttr("rawData", parsedInput.GetRawParsedData()),
utils.LogAttr("verificationKey", verification.VerificationKey),
utils.LogAttr("apiInterface", cf.endpoint.ApiInterface),
)
return nil
}
Expand Down Expand Up @@ -292,8 +295,9 @@ func (cf *ChainFetcher) FetchLatestBlockNum(ctx context.Context) (int64, error)
{Key: "error", Value: err},
}...)
}
blockNum, err := parser.ParseBlockFromReply(parserInput, parsing.ResultParsing)
if err != nil {
parsedInput := parser.ParseBlockFromReply(parserInput, parsing.ResultParsing, parsing.Parsers)
blockNum := parsedInput.GetBlock()
if blockNum == spectypes.NOT_APPLICABLE {
return spectypes.NOT_APPLICABLE, utils.LavaFormatDebug(tagName+" Failed to parse Response", []utils.Attribute{
{Key: "chainId", Value: chainId},
{Key: "nodeUrl", Value: proxyUrl.Url},
Expand Down Expand Up @@ -355,7 +359,7 @@ func (cf *ChainFetcher) FetchBlockHashByNum(ctx context.Context, blockNum int64)
}...)
}

res, err := parser.ParseFromReplyAndDecode(parserInput, parsing.ResultParsing)
res, err := parser.ParseBlockHashFromReplyAndDecode(parserInput, parsing.ResultParsing, parsing.Parsers)
if err != nil {
return "", utils.LavaFormatDebug(tagName+" Failed ParseMessageResponse", []utils.Attribute{
{Key: "error", Value: err},
Expand Down
4 changes: 4 additions & 0 deletions protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func (cwm *ConsumerWebsocketManager) handleRateLimitReached(inpData []byte) ([]b
}

func (cwm *ConsumerWebsocketManager) ListenToMessages() {
// adding metrics for how many active connections we have.
cwm.rpcConsumerLogs.SetWebSocketConnectionActive(cwm.chainId, cwm.apiInterface, true)
defer cwm.rpcConsumerLogs.SetWebSocketConnectionActive(cwm.chainId, cwm.apiInterface, false)

var (
messageType int
msg []byte
Expand Down
10 changes: 4 additions & 6 deletions protocol/chainlib/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,8 @@ func TestParsingRequestedBlocksHeadersGrpc(t *testing.T) {
require.NoError(t, err)
parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage)
require.NoError(t, err)
blockNum, err := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing)
require.NoError(t, err)
require.Equal(t, test.block, blockNum)
parsedInput := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing, nil)
require.Equal(t, test.block, parsedInput.GetBlock())
})
}
}
Expand Down Expand Up @@ -287,9 +286,8 @@ func TestSettingBlocksHeadersGrpc(t *testing.T) {
require.NoError(t, err)
parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage)
require.NoError(t, err)
blockNum, err := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing)
require.NoError(t, err)
require.Equal(t, test.block, blockNum)
parsedInput := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing, nil)
require.Equal(t, test.block, parsedInput.GetBlock())
})
}
}
2 changes: 1 addition & 1 deletion protocol/chainlib/grpcproxy/grpcproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (RawBytesCodec) Marshal(v interface{}) ([]byte, error) {
func (RawBytesCodec) Unmarshal(data []byte, v interface{}) error {
bufferPtr, ok := v.(*[]byte)
if !ok {
return utils.LavaFormatError("cannot decode into type", nil, utils.LogAttr("v", v), utils.LogAttr("data", data))
return utils.LavaFormatDebug("cannot decode into type", utils.LogAttr("v", v), utils.LogAttr("data", data))
}
*bufferPtr = data
return nil
Expand Down
5 changes: 3 additions & 2 deletions protocol/chainlib/jsonRPC_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ func TestJsonRpcChainProxy(t *testing.T) {
require.NoError(t, err)

_, err = chainFetcher.FetchBlockHashByNum(ctx, block)
errMsg := "GET_BLOCK_BY_NUM Failed ParseMessageResponse {error:invalid parser input format"
require.True(t, err.Error()[:len(errMsg)] == errMsg, err.Error())
actualErrMsg := "GET_BLOCK_BY_NUM Failed ParseMessageResponse {error:blockParsing - parse failed {error:invalid parser input format,"
expectedErrMsg := err.Error()[:len(actualErrMsg)]
require.Equal(t, actualErrMsg, expectedErrMsg, err.Error())
}

func TestAddonAndVerifications(t *testing.T) {
Expand Down
10 changes: 4 additions & 6 deletions protocol/chainlib/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,8 @@ func TestParsingRequestedBlocksHeadersRest(t *testing.T) {
require.NoError(t, err)
parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage)
require.NoError(t, err)
blockNum, err := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing)
require.NoError(t, err)
require.Equal(t, test.block, blockNum)
parsedInput := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing, nil)
require.Equal(t, test.block, parsedInput.GetBlock())
})
}
}
Expand Down Expand Up @@ -289,9 +288,8 @@ func TestSettingRequestedBlocksHeadersRest(t *testing.T) {
require.NoError(t, err)
parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage)
require.NoError(t, err)
blockNum, err := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing)
require.NoError(t, err)
require.Equal(t, test.block, blockNum)
parsedInput := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing, nil)
require.Equal(t, test.block, parsedInput.GetBlock())
})
}
}
Expand Down
19 changes: 19 additions & 0 deletions protocol/metrics/metrics_consumer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ConsumerMetricsManager struct {
totalFailedWsSubscriptionRequestsMetric *prometheus.CounterVec
totalWsSubscriptionDissconnectMetric *prometheus.CounterVec
totalDuplicatedWsSubscriptionRequestsMetric *prometheus.CounterVec
totalWebSocketConnectionsActive *prometheus.GaugeVec
blockMetric *prometheus.GaugeVec
latencyMetric *prometheus.GaugeVec
qosMetric *prometheus.GaugeVec
Expand Down Expand Up @@ -113,6 +114,11 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
Help: "The total number of duplicated webscket subscription requests over time per chain id per api interface.",
}, []string{"spec", "apiInterface"})

totalWebSocketConnectionsActive := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_total_websocket_connections_active",
Help: "The total number of currently active websocket connections with users",
}, []string{"spec", "apiInterface"})

totalWsSubscriptionDissconnectMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_ws_subscription_disconnect",
Help: "The total number of websocket subscription disconnects over time per chain id per api interface per dissconnect reason.",
Expand Down Expand Up @@ -218,6 +224,7 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
prometheus.MustRegister(endpointsHealthChecksOkMetric)
prometheus.MustRegister(protocolVersionMetric)
prometheus.MustRegister(totalRelaysSentByNewBatchTickerMetric)
prometheus.MustRegister(totalWebSocketConnectionsActive)
prometheus.MustRegister(apiSpecificsMetric)
prometheus.MustRegister(averageLatencyMetric)
prometheus.MustRegister(totalRelaysSentToProvidersMetric)
Expand All @@ -238,6 +245,7 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
totalFailedWsSubscriptionRequestsMetric: totalFailedWsSubscriptionRequestsMetric,
totalDuplicatedWsSubscriptionRequestsMetric: totalDuplicatedWsSubscriptionRequestsMetric,
totalWsSubscriptionDissconnectMetric: totalWsSubscriptionDissconnectMetric,
totalWebSocketConnectionsActive: totalWebSocketConnectionsActive,
totalErroredMetric: totalErroredMetric,
blockMetric: blockMetric,
latencyMetric: latencyMetric,
Expand Down Expand Up @@ -297,6 +305,17 @@ func (pme *ConsumerMetricsManager) SetRelaySentToProviderMetric(chainId string,
pme.totalRelaysSentToProvidersMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) SetWebSocketConnectionActive(chainId string, apiInterface string, add bool) {
if pme == nil {
return
}
if add {
pme.totalWebSocketConnectionsActive.WithLabelValues(chainId, apiInterface).Add(1)
} else {
pme.totalWebSocketConnectionsActive.WithLabelValues(chainId, apiInterface).Sub(1)
}
}

func (pme *ConsumerMetricsManager) SetRelayNodeErrorMetric(chainId string, apiInterface string) {
if pme == nil {
return
Expand Down
4 changes: 4 additions & 0 deletions protocol/metrics/rpcconsumerlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func NewRPCConsumerLogs(consumerMetricsManager *ConsumerMetricsManager, consumer
return rpcConsumerLogs, err
}

func (rpccl *RPCConsumerLogs) SetWebSocketConnectionActive(chainId string, apiInterface string, add bool) {
rpccl.consumerMetricsManager.SetWebSocketConnectionActive(chainId, apiInterface, add)
}

func (rpccl *RPCConsumerLogs) SetRelaySentToProviderMetric(chainId string, apiInterface string) {
rpccl.consumerMetricsManager.SetRelaySentToProviderMetric(chainId, apiInterface)
}
Expand Down
Loading

0 comments on commit 329fea7

Please sign in to comment.