Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: consollidate-chain-message-data #1636

Merged
merged 6 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,13 @@ type RelaySender interface {
consumerIp string,
analytics *metrics.RelayMetrics,
metadata []pairingtypes.Metadata,
) (ChainMessage, map[string]string, *pairingtypes.RelayPrivateData, error)
) (ProtocolMessage, error)
SendParsedRelay(
ctx context.Context,
dappID string,
consumerIp string,
analytics *metrics.RelayMetrics,
chainMessage ChainMessage,
directiveHeaders map[string]string,
relayRequestData *pairingtypes.RelayPrivateData,
protocolMessage ProtocolMessage,
) (relayResult *common.RelayResult, errRet error)
CreateDappKey(dappID, consumerIp string) string
CancelSubscriptionContext(subscriptionKey string)
Expand Down
141 changes: 96 additions & 45 deletions protocol/chainlib/chainlib_mock.go

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {

metricsData := metrics.NewRelayAnalytics(dappID, cwm.chainId, cwm.apiInterface)

chainMessage, directiveHeaders, relayRequestData, err := cwm.relaySender.ParseRelay(webSocketCtx, "", string(msg), cwm.connectionType, dappID, userIp, metricsData, nil)
protocolMessage, err := cwm.relaySender.ParseRelay(webSocketCtx, "", string(msg), cwm.connectionType, dappID, userIp, metricsData, nil)
if err != nil {
formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), utils.LavaFormatError("could not parse message", err), msgSeed, msg, cwm.apiInterface, time.Since(startTime))
if formatterMsg != nil {
Expand All @@ -159,9 +159,9 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
}

// check whether its a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow.
if !IsFunctionTagOfType(chainMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) {
if IsFunctionTagOfType(chainMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE) {
err := cwm.consumerWsSubscriptionManager.Unsubscribe(webSocketCtx, chainMessage, directiveHeaders, relayRequestData, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
if !IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) {
if IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE) {
err := cwm.consumerWsSubscriptionManager.Unsubscribe(webSocketCtx, protocolMessage, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
if err != nil {
utils.LavaFormatWarning("error unsubscribing from subscription", err, utils.LogAttr("GUID", webSocketCtx))
if err == common.SubscriptionNotFoundError {
Expand All @@ -174,15 +174,15 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
}
}
continue
} else if IsFunctionTagOfType(chainMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE_ALL) {
} else if IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE_ALL) {
err := cwm.consumerWsSubscriptionManager.UnsubscribeAll(webSocketCtx, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
if err != nil {
utils.LavaFormatWarning("error unsubscribing from all subscription", err, utils.LogAttr("GUID", webSocketCtx))
}
continue
} else {
// Normal relay over websocket. (not subscription related)
relayResult, err := cwm.relaySender.SendParsedRelay(webSocketCtx, dappID, userIp, metricsData, chainMessage, directiveHeaders, relayRequestData)
relayResult, err := cwm.relaySender.SendParsedRelay(webSocketCtx, dappID, userIp, metricsData, protocolMessage)
if err != nil {
formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), utils.LavaFormatError("could not send parsed relay", err), msgSeed, msg, cwm.apiInterface, time.Since(startTime))
if formatterMsg != nil {
Expand All @@ -202,16 +202,16 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
}

// Subscription flow
inputFormatter, outputFormatter := formatter.FormatterForRelayRequestAndResponse(relayRequestData.ApiInterface) // we use this to preserve the original jsonrpc id
inputFormatter(relayRequestData.Data) // set the extracted jsonrpc id
inputFormatter, outputFormatter := formatter.FormatterForRelayRequestAndResponse(protocolMessage.GetApiCollection().CollectionData.ApiInterface) // we use this to preserve the original jsonrpc id
inputFormatter(protocolMessage.RelayPrivateData().Data) // set the extracted jsonrpc id

reply, subscriptionMsgsChan, err := cwm.consumerWsSubscriptionManager.StartSubscription(webSocketCtx, chainMessage, directiveHeaders, relayRequestData, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
reply, subscriptionMsgsChan, err := cwm.consumerWsSubscriptionManager.StartSubscription(webSocketCtx, protocolMessage, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
if err != nil {
utils.LavaFormatWarning("StartSubscription returned an error", err,
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("dappID", dappID),
utils.LogAttr("userIp", userIp),
utils.LogAttr("params", chainMessage.GetRPCMessage().GetParams()),
utils.LogAttr("params", protocolMessage.GetRPCMessage().GetParams()),
)

formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), utils.LavaFormatError("could not start subscription", err), msgSeed, msg, cwm.apiInterface, time.Since(startTime))
Expand Down Expand Up @@ -239,7 +239,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("dappID", dappID),
utils.LogAttr("userIp", userIp),
utils.LogAttr("params", chainMessage.GetRPCMessage().GetParams()),
utils.LogAttr("params", protocolMessage.GetRPCMessage().GetParams()),
)

for subscriptionMsgReply := range subscriptionMsgsChan {
Expand All @@ -250,7 +250,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("dappID", dappID),
utils.LogAttr("userIp", userIp),
utils.LogAttr("params", chainMessage.GetRPCMessage().GetParams()),
utils.LogAttr("params", protocolMessage.GetRPCMessage().GetParams()),
)
}()
}
Expand Down
Loading
Loading