diff --git a/protocol/chainlib/chain_message.go b/protocol/chainlib/chain_message.go index 39c0110c9..4d973393b 100644 --- a/protocol/chainlib/chain_message.go +++ b/protocol/chainlib/chain_message.go @@ -12,6 +12,11 @@ import ( spectypes "github.com/lavanet/lava/v4/x/spec/types" ) +type OriginalRaw struct { + Path string + Data []byte +} + type updatableRPCInput interface { rpcInterfaceMessages.GenericMessage UpdateLatestBlockInMessage(latestBlock uint64, modifyContent bool) (success bool) @@ -32,13 +37,17 @@ type baseChainMessageContainer struct { forceCacheRefresh bool parseDirective *spectypes.ParseDirective // setting the parse directive related to the api, can be nil usedDefaultValue bool - - inputHashCache []byte + originalRaw OriginalRaw + inputHashCache []byte // resultErrorParsingMethod passed by each api interface message to parse the result of the message // and validate it doesn't contain a node error resultErrorParsingMethod func(data []byte, httpStatusCode int) (hasError bool, errorMessage string) } +func (bcmc *baseChainMessageContainer) GetOriginal() (path string, data []byte) { + return bcmc.originalRaw.Path, bcmc.originalRaw.Data +} + func (bcmc *baseChainMessageContainer) UpdateEarliestInMessage(incomingEarliest int64) bool { updatedSuccessfully := false if bcmc.earliestRequestedBlock != spectypes.EARLIEST_BLOCK { diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 41c024ae0..2b77a97c3 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -113,6 +113,7 @@ type ChainMessageForSend interface { GetApiCollection() *spectypes.ApiCollection GetParseDirective() *spectypes.ParseDirective CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string) + GetOriginal() (path string, data []byte) } type HealthReporter interface { diff --git a/protocol/chainlib/chainlib_mock.go b/protocol/chainlib/chainlib_mock.go index 3978d88b8..60dfc1604 100644 --- a/protocol/chainlib/chainlib_mock.go +++ b/protocol/chainlib/chainlib_mock.go @@ -320,6 +320,14 @@ func (m *MockChainMessage) CheckResponseError(data []byte, httpStatusCode int) ( return ret0, ret1 } +func (m *MockChainMessage) GetOriginal() (string,[]byte) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOriginal") + ret0, _ := ret[0].(string) + ret1, _ := ret[0].([]byte) + return ret0,ret1 +} + // CheckResponseError indicates an expected call of CheckResponseError. func (mr *MockChainMessageMockRecorder) CheckResponseError(data, httpStatusCode interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() diff --git a/protocol/chainlib/grpc.go b/protocol/chainlib/grpc.go index 081941df2..1912f5587 100644 --- a/protocol/chainlib/grpc.go +++ b/protocol/chainlib/grpc.go @@ -117,7 +117,7 @@ func (apip *GrpcChainParser) CraftMessage(parsing *spectypes.ParseDirective, con } parsedInput := &parser.ParsedInput{} parsedInput.SetBlock(spectypes.NOT_APPLICABLE) - return apip.newChainMessage(apiCont.api, parsedInput, grpcMessage, apiCollection), nil + return apip.newChainMessage(apiCont.api, parsedInput, grpcMessage, apiCollection, OriginalRaw{Path: grpcMessage.Path, Data: grpcMessage.Msg}), nil } // ParseMsg parses message data into chain message object @@ -172,12 +172,12 @@ func (apip *GrpcChainParser) ParseMsg(url string, data []byte, connectionType st } } - nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &grpcMessage, apiCollection) + nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &grpcMessage, apiCollection, OriginalRaw{Data: data, Path: url}) apip.BaseChainParser.ExtensionParsing(apiCollection.CollectionData.AddOn, nodeMsg, extensionInfo) return nodeMsg, apip.BaseChainParser.Validate(nodeMsg) } -func (*GrpcChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, grpcMessage *rpcInterfaceMessages.GrpcMessage, apiCollection *spectypes.ApiCollection) *baseChainMessageContainer { +func (*GrpcChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, grpcMessage *rpcInterfaceMessages.GrpcMessage, apiCollection *spectypes.ApiCollection, originalRaw OriginalRaw) *baseChainMessageContainer { requestedBlock := parsedInput.GetBlock() requestedHashes, _ := parsedInput.GetBlockHashes() nodeMsg := &baseChainMessageContainer{ @@ -189,6 +189,7 @@ func (*GrpcChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser. resultErrorParsingMethod: grpcMessage.CheckResponseError, parseDirective: GetParseDirective(api, apiCollection), usedDefaultValue: parsedInput.UsedDefaultValue, + originalRaw: originalRaw, } return nodeMsg } diff --git a/protocol/chainlib/jsonRPC.go b/protocol/chainlib/jsonRPC.go index de4826337..a868a2f4f 100644 --- a/protocol/chainlib/jsonRPC.go +++ b/protocol/chainlib/jsonRPC.go @@ -92,7 +92,12 @@ func (apip *JsonRPCChainParser) CraftMessage(parsing *spectypes.ParseDirective, if err != nil { return nil, err } - return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, msg, apiCollection, false), nil + originalRaw := OriginalRaw{Path: "", Data: []byte{}} + data, err := json.Marshal(msg) + if err == nil { + originalRaw.Data = data + } + return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, msg, apiCollection, false, originalRaw), nil } // this func parses message data into chain message object @@ -215,9 +220,9 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType var nodeMsg *baseChainMessageContainer if len(msgs) == 1 { - nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &msgs[0], apiCollection, parsedDefault) + nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &msgs[0], apiCollection, parsedDefault, OriginalRaw{Path: url, Data: data}) } else { - nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault) + nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault, OriginalRaw{Path: url, Data: data}) if err != nil { return nil, err } @@ -226,7 +231,7 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType return nodeMsg, apip.BaseChainParser.Validate(nodeMsg) } -func (*JsonRPCChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedBlockHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) (*baseChainMessageContainer, error) { +func (*JsonRPCChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedBlockHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool, originalRaw OriginalRaw) (*baseChainMessageContainer, error) { batchMessage, err := rpcInterfaceMessages.NewBatchMessage(msgs) if err != nil { return nil, err @@ -241,11 +246,12 @@ func (*JsonRPCChainParser) newBatchChainMessage(serviceApi *spectypes.Api, reque resultErrorParsingMethod: rpcInterfaceMessages.CheckResponseErrorForJsonRpcBatch, parseDirective: nil, usedDefaultValue: usedDefaultValue, + originalRaw: originalRaw, } return nodeMsg, err } -func (*JsonRPCChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedBlockHashes []string, msg *rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) *baseChainMessageContainer { +func (*JsonRPCChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedBlockHashes []string, msg *rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool, originalRaw OriginalRaw) *baseChainMessageContainer { nodeMsg := &baseChainMessageContainer{ api: serviceApi, apiCollection: apiCollection, @@ -255,6 +261,7 @@ func (*JsonRPCChainParser) newChainMessage(serviceApi *spectypes.Api, requestedB resultErrorParsingMethod: msg.CheckResponseError, parseDirective: GetParseDirective(serviceApi, apiCollection), usedDefaultValue: usedDefaultValue, + originalRaw: originalRaw, } return nodeMsg } diff --git a/protocol/chainlib/rest.go b/protocol/chainlib/rest.go index c8d651396..9feb00997 100644 --- a/protocol/chainlib/rest.go +++ b/protocol/chainlib/rest.go @@ -75,7 +75,7 @@ func (apip *RestChainParser) CraftMessage(parsing *spectypes.ParseDirective, con } parsedInput := parser.NewParsedInput() parsedInput.SetBlock(spectypes.NOT_APPLICABLE) - return apip.newChainMessage(apiCont.api, parsedInput, restMessage, apiCollection), nil + return apip.newChainMessage(apiCont.api, parsedInput, restMessage, apiCollection, OriginalRaw{Path: restMessage.Path, Data: restMessage.Msg}), nil } // ParseMsg parses message data into chain message object @@ -132,12 +132,12 @@ func (apip *RestChainParser) ParseMsg(urlPath string, data []byte, connectionTyp } } - nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &restMessage, apiCollection) + nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &restMessage, apiCollection, OriginalRaw{Data: data, Path: urlPath}) apip.BaseChainParser.ExtensionParsing(apiCollection.CollectionData.AddOn, nodeMsg, extensionInfo) return nodeMsg, apip.BaseChainParser.Validate(nodeMsg) } -func (*RestChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, restMessage *rpcInterfaceMessages.RestMessage, apiCollection *spectypes.ApiCollection) *baseChainMessageContainer { +func (*RestChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, restMessage *rpcInterfaceMessages.RestMessage, apiCollection *spectypes.ApiCollection, originalRaw OriginalRaw) *baseChainMessageContainer { requestedBlock := parsedInput.GetBlock() requestedHashes, _ := parsedInput.GetBlockHashes() nodeMsg := &baseChainMessageContainer{ @@ -149,6 +149,7 @@ func (*RestChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser. resultErrorParsingMethod: restMessage.CheckResponseError, parseDirective: GetParseDirective(api, apiCollection), usedDefaultValue: parsedInput.UsedDefaultValue, + originalRaw: originalRaw, } return nodeMsg } diff --git a/protocol/chainlib/tendermintRPC.go b/protocol/chainlib/tendermintRPC.go index 3bb886709..18e0055e3 100644 --- a/protocol/chainlib/tendermintRPC.go +++ b/protocol/chainlib/tendermintRPC.go @@ -90,7 +90,12 @@ func (apip *TendermintChainParser) CraftMessage(parsing *spectypes.ParseDirectiv return nil, err } tenderMsg := rpcInterfaceMessages.TendermintrpcMessage{JsonrpcMessage: msg, Path: parsing.ApiName} - return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, &tenderMsg, apiCollection, false), nil + originalRaw := OriginalRaw{Path: tenderMsg.Path, Data: []byte{}} + data, err := json.Marshal(msg) + if err == nil { + originalRaw.Data = data + } + return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, &tenderMsg, apiCollection, false, originalRaw), nil } // ParseMsg parses message data into chain message object @@ -243,10 +248,10 @@ func (apip *TendermintChainParser) ParseMsg(urlPath string, data []byte, connect if !isJsonrpc { tenderMsg.Path = urlPath // add path } - nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &tenderMsg, apiCollection, parsedDefault) + nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &tenderMsg, apiCollection, parsedDefault, OriginalRaw{Data: data, Path: urlPath}) } else { var err error - nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault) + nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault, OriginalRaw{Data: data, Path: urlPath}) if err != nil { return nil, err } @@ -256,7 +261,7 @@ func (apip *TendermintChainParser) ParseMsg(urlPath string, data []byte, connect return nodeMsg, apip.BaseChainParser.Validate(nodeMsg) } -func (*TendermintChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) (*baseChainMessageContainer, error) { +func (*TendermintChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool, originalRaw OriginalRaw) (*baseChainMessageContainer, error) { batchMessage, err := rpcInterfaceMessages.NewBatchMessage(msgs) if err != nil { return nil, err @@ -271,6 +276,7 @@ func (*TendermintChainParser) newBatchChainMessage(serviceApi *spectypes.Api, re resultErrorParsingMethod: rpcInterfaceMessages.CheckResponseErrorForJsonRpcBatch, parseDirective: GetParseDirective(serviceApi, apiCollection), usedDefaultValue: usedDefaultValue, + originalRaw: originalRaw, } return nodeMsg, err } @@ -281,7 +287,7 @@ func (apip *TendermintChainParser) ExtractDataFromRequest(request *http.Request) return url, data, "", metadata, err } -func (*TendermintChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedHashes []string, msg *rpcInterfaceMessages.TendermintrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) *baseChainMessageContainer { +func (*TendermintChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedHashes []string, msg *rpcInterfaceMessages.TendermintrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool, originalRaw OriginalRaw) *baseChainMessageContainer { nodeMsg := &baseChainMessageContainer{ api: serviceApi, apiCollection: apiCollection, @@ -291,6 +297,7 @@ func (*TendermintChainParser) newChainMessage(serviceApi *spectypes.Api, request resultErrorParsingMethod: msg.CheckResponseError, parseDirective: GetParseDirective(serviceApi, apiCollection), usedDefaultValue: usedDefaultValue, + originalRaw: originalRaw, } return nodeMsg } diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index a774f0e1d..b2576f899 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -173,6 +173,7 @@ func (rpccs *RPCConsumerServer) sendCraftedRelaysWrapper(initialRelays bool) (bo if success { rpccs.initialized.Store(true) } + go rpccs.ExtractNodeData() return success, err } @@ -1612,3 +1613,44 @@ func (rpccs *RPCConsumerServer) updateProtocolMessageIfNeededWithNewEarliestData } return protocolMessage } + +// we implement rpcConsumerServer as a chain router so we can use it in a chainFetcher +// SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) // has to be thread safe, reuse code within ParseMsg as common functionality +// ExtensionsSupported(internalPath string, extensions []string) bool +func (rpccs *RPCConsumerServer) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage chainlib.ChainMessage, extensions []string) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer pairingtypes.RelayerClient, proxyUrl common.NodeUrl, chainId string, err error) { + ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) + reqBlock, _ := chainMessage.RequestedBlock() + userData := common.UserData{DappId: initRelaysDappId, ConsumerIp: initRelaysConsumerIp} + seenBlock, _ := rpccs.consumerConsistency.GetSeenBlock(userData) + collectionData := chainMessage.GetApiCollection().CollectionData + path, data := chainMessage.GetOriginal() + relay := lavaprotocol.NewRelayData(ctx, collectionData.Type, path, data, seenBlock, reqBlock, rpccs.listenEndpoint.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), chainlib.GetAddon(chainMessage), nil) + protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relay, userData.DappId, userData.ConsumerIp) + chainId = rpccs.listenEndpoint.ChainID + proxyUrl = common.NodeUrl{Url: "rpcconsumer"} + relayProcessor, err := rpccs.ProcessRelaySend(ctx, protocolMessage, nil) + if err != nil && !relayProcessor.HasResults() { + return nil, "", nil, proxyUrl, chainId, err + } + returnedResult, err := relayProcessor.ProcessingResult() + return returnedResult.Reply, "", nil, proxyUrl, chainId, err +} + +func (rpccs *RPCConsumerServer) ExtensionsSupported(internalPath string, extensions []string) bool { + configuredExtensions := rpccs.chainParser.ExtensionsParser().GetConfiguredExtensions() + configured := map[string]struct{}{} + for _, extension := range configuredExtensions { + configured[extension.Name] = struct{}{} + } + for _, extension := range extensions { + if _, found := configured[extension]; !found { + return false + } + } + return true +} + +// this function sends relays to the provider and according to the results enhances capabilities of the consumer such as parsing of data and errors +func (rpccs *RPCConsumerServer) ExtractNodeData() { + +}