Skip to content

Commit

Permalink
consumer is now supporting chainRouter interface
Browse files Browse the repository at this point in the history
  • Loading branch information
omerlavanet committed Dec 11, 2024
1 parent 4631607 commit 240f5f9
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 18 deletions.
13 changes: 11 additions & 2 deletions protocol/chainlib/chain_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
spectypes "github.com/lavanet/lava/v4/x/spec/types"
)

type OriginalRaw struct {
Path string
Data []byte
}

type updatableRPCInput interface {
rpcInterfaceMessages.GenericMessage
UpdateLatestBlockInMessage(latestBlock uint64, modifyContent bool) (success bool)
Expand All @@ -32,13 +37,17 @@ type baseChainMessageContainer struct {
forceCacheRefresh bool
parseDirective *spectypes.ParseDirective // setting the parse directive related to the api, can be nil
usedDefaultValue bool

inputHashCache []byte
originalRaw OriginalRaw
inputHashCache []byte
// resultErrorParsingMethod passed by each api interface message to parse the result of the message
// and validate it doesn't contain a node error
resultErrorParsingMethod func(data []byte, httpStatusCode int) (hasError bool, errorMessage string)
}

func (bcmc *baseChainMessageContainer) GetOriginal() (path string, data []byte) {
return bcmc.originalRaw.Path, bcmc.originalRaw.Data
}

func (bcmc *baseChainMessageContainer) UpdateEarliestInMessage(incomingEarliest int64) bool {
updatedSuccessfully := false
if bcmc.earliestRequestedBlock != spectypes.EARLIEST_BLOCK {
Expand Down
1 change: 1 addition & 0 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type ChainMessageForSend interface {
GetApiCollection() *spectypes.ApiCollection
GetParseDirective() *spectypes.ParseDirective
CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string)
GetOriginal() (path string, data []byte)
}

type HealthReporter interface {
Expand Down
8 changes: 8 additions & 0 deletions protocol/chainlib/chainlib_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (apip *GrpcChainParser) CraftMessage(parsing *spectypes.ParseDirective, con
}
parsedInput := &parser.ParsedInput{}
parsedInput.SetBlock(spectypes.NOT_APPLICABLE)
return apip.newChainMessage(apiCont.api, parsedInput, grpcMessage, apiCollection), nil
return apip.newChainMessage(apiCont.api, parsedInput, grpcMessage, apiCollection, OriginalRaw{Path: grpcMessage.Path, Data: grpcMessage.Msg}), nil
}

// ParseMsg parses message data into chain message object
Expand Down Expand Up @@ -172,12 +172,12 @@ func (apip *GrpcChainParser) ParseMsg(url string, data []byte, connectionType st
}
}

nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &grpcMessage, apiCollection)
nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &grpcMessage, apiCollection, OriginalRaw{Data: data, Path: url})
apip.BaseChainParser.ExtensionParsing(apiCollection.CollectionData.AddOn, nodeMsg, extensionInfo)
return nodeMsg, apip.BaseChainParser.Validate(nodeMsg)
}

func (*GrpcChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, grpcMessage *rpcInterfaceMessages.GrpcMessage, apiCollection *spectypes.ApiCollection) *baseChainMessageContainer {
func (*GrpcChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, grpcMessage *rpcInterfaceMessages.GrpcMessage, apiCollection *spectypes.ApiCollection, originalRaw OriginalRaw) *baseChainMessageContainer {
requestedBlock := parsedInput.GetBlock()
requestedHashes, _ := parsedInput.GetBlockHashes()
nodeMsg := &baseChainMessageContainer{
Expand All @@ -189,6 +189,7 @@ func (*GrpcChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.
resultErrorParsingMethod: grpcMessage.CheckResponseError,
parseDirective: GetParseDirective(api, apiCollection),
usedDefaultValue: parsedInput.UsedDefaultValue,
originalRaw: originalRaw,
}
return nodeMsg
}
Expand Down
17 changes: 12 additions & 5 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ func (apip *JsonRPCChainParser) CraftMessage(parsing *spectypes.ParseDirective,
if err != nil {
return nil, err
}
return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, msg, apiCollection, false), nil
originalRaw := OriginalRaw{Path: "", Data: []byte{}}
data, err := json.Marshal(msg)
if err == nil {
originalRaw.Data = data
}
return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, msg, apiCollection, false, originalRaw), nil
}

// this func parses message data into chain message object
Expand Down Expand Up @@ -215,9 +220,9 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType

var nodeMsg *baseChainMessageContainer
if len(msgs) == 1 {
nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &msgs[0], apiCollection, parsedDefault)
nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &msgs[0], apiCollection, parsedDefault, OriginalRaw{Path: url, Data: data})
} else {
nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault)
nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault, OriginalRaw{Path: url, Data: data})
if err != nil {
return nil, err
}
Expand All @@ -226,7 +231,7 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType
return nodeMsg, apip.BaseChainParser.Validate(nodeMsg)
}

func (*JsonRPCChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedBlockHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) (*baseChainMessageContainer, error) {
func (*JsonRPCChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedBlockHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool, originalRaw OriginalRaw) (*baseChainMessageContainer, error) {
batchMessage, err := rpcInterfaceMessages.NewBatchMessage(msgs)
if err != nil {
return nil, err
Expand All @@ -241,11 +246,12 @@ func (*JsonRPCChainParser) newBatchChainMessage(serviceApi *spectypes.Api, reque
resultErrorParsingMethod: rpcInterfaceMessages.CheckResponseErrorForJsonRpcBatch,
parseDirective: nil,
usedDefaultValue: usedDefaultValue,
originalRaw: originalRaw,
}
return nodeMsg, err
}

func (*JsonRPCChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedBlockHashes []string, msg *rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) *baseChainMessageContainer {
func (*JsonRPCChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedBlockHashes []string, msg *rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool, originalRaw OriginalRaw) *baseChainMessageContainer {
nodeMsg := &baseChainMessageContainer{
api: serviceApi,
apiCollection: apiCollection,
Expand All @@ -255,6 +261,7 @@ func (*JsonRPCChainParser) newChainMessage(serviceApi *spectypes.Api, requestedB
resultErrorParsingMethod: msg.CheckResponseError,
parseDirective: GetParseDirective(serviceApi, apiCollection),
usedDefaultValue: usedDefaultValue,
originalRaw: originalRaw,
}
return nodeMsg
}
Expand Down
7 changes: 4 additions & 3 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (apip *RestChainParser) CraftMessage(parsing *spectypes.ParseDirective, con
}
parsedInput := parser.NewParsedInput()
parsedInput.SetBlock(spectypes.NOT_APPLICABLE)
return apip.newChainMessage(apiCont.api, parsedInput, restMessage, apiCollection), nil
return apip.newChainMessage(apiCont.api, parsedInput, restMessage, apiCollection, OriginalRaw{Path: restMessage.Path, Data: restMessage.Msg}), nil
}

// ParseMsg parses message data into chain message object
Expand Down Expand Up @@ -132,12 +132,12 @@ func (apip *RestChainParser) ParseMsg(urlPath string, data []byte, connectionTyp
}
}

nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &restMessage, apiCollection)
nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &restMessage, apiCollection, OriginalRaw{Data: data, Path: urlPath})
apip.BaseChainParser.ExtensionParsing(apiCollection.CollectionData.AddOn, nodeMsg, extensionInfo)
return nodeMsg, apip.BaseChainParser.Validate(nodeMsg)
}

func (*RestChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, restMessage *rpcInterfaceMessages.RestMessage, apiCollection *spectypes.ApiCollection) *baseChainMessageContainer {
func (*RestChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, restMessage *rpcInterfaceMessages.RestMessage, apiCollection *spectypes.ApiCollection, originalRaw OriginalRaw) *baseChainMessageContainer {
requestedBlock := parsedInput.GetBlock()
requestedHashes, _ := parsedInput.GetBlockHashes()
nodeMsg := &baseChainMessageContainer{
Expand All @@ -149,6 +149,7 @@ func (*RestChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.
resultErrorParsingMethod: restMessage.CheckResponseError,
parseDirective: GetParseDirective(api, apiCollection),
usedDefaultValue: parsedInput.UsedDefaultValue,
originalRaw: originalRaw,
}
return nodeMsg
}
Expand Down
17 changes: 12 additions & 5 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ func (apip *TendermintChainParser) CraftMessage(parsing *spectypes.ParseDirectiv
return nil, err
}
tenderMsg := rpcInterfaceMessages.TendermintrpcMessage{JsonrpcMessage: msg, Path: parsing.ApiName}
return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, &tenderMsg, apiCollection, false), nil
originalRaw := OriginalRaw{Path: tenderMsg.Path, Data: []byte{}}
data, err := json.Marshal(msg)
if err == nil {
originalRaw.Data = data
}
return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, &tenderMsg, apiCollection, false, originalRaw), nil
}

// ParseMsg parses message data into chain message object
Expand Down Expand Up @@ -243,10 +248,10 @@ func (apip *TendermintChainParser) ParseMsg(urlPath string, data []byte, connect
if !isJsonrpc {
tenderMsg.Path = urlPath // add path
}
nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &tenderMsg, apiCollection, parsedDefault)
nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &tenderMsg, apiCollection, parsedDefault, OriginalRaw{Data: data, Path: urlPath})
} else {
var err error
nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault)
nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault, OriginalRaw{Data: data, Path: urlPath})
if err != nil {
return nil, err
}
Expand All @@ -256,7 +261,7 @@ func (apip *TendermintChainParser) ParseMsg(urlPath string, data []byte, connect
return nodeMsg, apip.BaseChainParser.Validate(nodeMsg)
}

func (*TendermintChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) (*baseChainMessageContainer, error) {
func (*TendermintChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool, originalRaw OriginalRaw) (*baseChainMessageContainer, error) {
batchMessage, err := rpcInterfaceMessages.NewBatchMessage(msgs)
if err != nil {
return nil, err
Expand All @@ -271,6 +276,7 @@ func (*TendermintChainParser) newBatchChainMessage(serviceApi *spectypes.Api, re
resultErrorParsingMethod: rpcInterfaceMessages.CheckResponseErrorForJsonRpcBatch,
parseDirective: GetParseDirective(serviceApi, apiCollection),
usedDefaultValue: usedDefaultValue,
originalRaw: originalRaw,
}
return nodeMsg, err
}
Expand All @@ -281,7 +287,7 @@ func (apip *TendermintChainParser) ExtractDataFromRequest(request *http.Request)
return url, data, "", metadata, err
}

func (*TendermintChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedHashes []string, msg *rpcInterfaceMessages.TendermintrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) *baseChainMessageContainer {
func (*TendermintChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedHashes []string, msg *rpcInterfaceMessages.TendermintrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool, originalRaw OriginalRaw) *baseChainMessageContainer {
nodeMsg := &baseChainMessageContainer{
api: serviceApi,
apiCollection: apiCollection,
Expand All @@ -291,6 +297,7 @@ func (*TendermintChainParser) newChainMessage(serviceApi *spectypes.Api, request
resultErrorParsingMethod: msg.CheckResponseError,
parseDirective: GetParseDirective(serviceApi, apiCollection),
usedDefaultValue: usedDefaultValue,
originalRaw: originalRaw,
}
return nodeMsg
}
Expand Down
42 changes: 42 additions & 0 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (rpccs *RPCConsumerServer) sendCraftedRelaysWrapper(initialRelays bool) (bo
if success {
rpccs.initialized.Store(true)
}
go rpccs.ExtractNodeData()
return success, err
}

Expand Down Expand Up @@ -1612,3 +1613,44 @@ func (rpccs *RPCConsumerServer) updateProtocolMessageIfNeededWithNewEarliestData
}
return protocolMessage
}

// we implement rpcConsumerServer as a chain router so we can use it in a chainFetcher
// SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) // has to be thread safe, reuse code within ParseMsg as common functionality
// ExtensionsSupported(internalPath string, extensions []string) bool
func (rpccs *RPCConsumerServer) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage chainlib.ChainMessage, extensions []string) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer pairingtypes.RelayerClient, proxyUrl common.NodeUrl, chainId string, err error) {
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
reqBlock, _ := chainMessage.RequestedBlock()
userData := common.UserData{DappId: initRelaysDappId, ConsumerIp: initRelaysConsumerIp}
seenBlock, _ := rpccs.consumerConsistency.GetSeenBlock(userData)
collectionData := chainMessage.GetApiCollection().CollectionData
path, data := chainMessage.GetOriginal()
relay := lavaprotocol.NewRelayData(ctx, collectionData.Type, path, data, seenBlock, reqBlock, rpccs.listenEndpoint.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), chainlib.GetAddon(chainMessage), nil)
protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relay, userData.DappId, userData.ConsumerIp)
chainId = rpccs.listenEndpoint.ChainID
proxyUrl = common.NodeUrl{Url: "rpcconsumer"}
relayProcessor, err := rpccs.ProcessRelaySend(ctx, protocolMessage, nil)
if err != nil && !relayProcessor.HasResults() {
return nil, "", nil, proxyUrl, chainId, err
}
returnedResult, err := relayProcessor.ProcessingResult()
return returnedResult.Reply, "", nil, proxyUrl, chainId, err
}

func (rpccs *RPCConsumerServer) ExtensionsSupported(internalPath string, extensions []string) bool {
configuredExtensions := rpccs.chainParser.ExtensionsParser().GetConfiguredExtensions()
configured := map[string]struct{}{}
for _, extension := range configuredExtensions {
configured[extension.Name] = struct{}{}
}
for _, extension := range extensions {
if _, found := configured[extension]; !found {
return false
}
}
return true
}

// this function sends relays to the provider and according to the results enhances capabilities of the consumer such as parsing of data and errors
func (rpccs *RPCConsumerServer) ExtractNodeData() {

}

0 comments on commit 240f5f9

Please sign in to comment.