From 23a591dbd76b2926791e06a723348bc28e811400 Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Mon, 2 Dec 2024 16:18:51 +0200 Subject: [PATCH] Fix solana subscription --- cookbook/specs/solana.json | 151 +++++++++++------- .../chainlib/chainproxy/rpcclient/handler.go | 24 ++- .../chainlib/chainproxy/rpcclient/json.go | 13 +- .../chainproxy/rpcclient/subscription.go | 2 +- 4 files changed, 119 insertions(+), 71 deletions(-) diff --git a/cookbook/specs/solana.json b/cookbook/specs/solana.json index 5089de1331..07d746c62f 100755 --- a/cookbook/specs/solana.json +++ b/cookbook/specs/solana.json @@ -964,7 +964,91 @@ "stateful": 0 }, "extra_compute_units": 0 + } + ], + "headers": [], + "inheritance_apis": [], + "parse_directives": [ + { + "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getLatestBlockhash\",\"params\":[{\"commitment\":\"finalized\"}],\"id\":1}", + "function_tag": "GET_BLOCKNUM", + "result_parsing": { + "parser_arg": [ + "0", + "context", + "slot" + ], + "parser_func": "PARSE_CANONICAL" + }, + "api_name": "getLatestBlockhash" }, + { + "function_tag": "GET_BLOCK_BY_NUM", + "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getBlock\",\"params\":[%d,{\"transactionDetails\":\"none\",\"rewards\":false}],\"id\":1}", + "result_parsing": { + "parser_arg": [ + "0", + "blockhash" + ], + "parser_func": "PARSE_CANONICAL", + "encoding": "base64" + }, + "api_name": "getBlock" + } + ], + "verifications": [ + { + "name": "version", + "parse_directive": { + "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getVersion\",\"params\":[],\"id\":1}", + "function_tag": "VERIFICATION", + "result_parsing": { + "parser_arg": [ + "0", + "solana-core" + ], + "parser_func": "PARSE_CANONICAL" + }, + "api_name": "getVersion" + }, + "values": [ + { + "expected_value": "*" + } + ] + }, + { + "name": "tokens-owner-indexed", + "parse_directive": { + "function_template": "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"getTokenAccountsByOwner\",\"params\":[\"4Qkev8aNZcqFNSRhQzwyLMFSsi94jHqE8WNVTJzTP99F\",{\"programId\":\"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA\"},{\"encoding\":\"jsonParsed\"}]}", + "function_tag": "VERIFICATION", + "result_parsing": { + "parser_arg": [ + "0", + "value" + ], + "parser_func": "PARSE_CANONICAL" + }, + "api_name": "getTokenAccountsByOwner" + }, + "values": [ + { + "expected_value": "*", + "severity": "Warning" + } + ] + } + ] + }, + { + "enabled": true, + "collection_data": { + "api_interface": "jsonrpc", + "internal_path": "/ws", + "type": "POST", + "add_on": "" + }, + "apis": [ { "name": "accountSubscribe", "block_parsing": { @@ -1294,30 +1378,17 @@ "inheritance_apis": [], "parse_directives": [ { - "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getLatestBlockhash\",\"params\":[{\"commitment\":\"finalized\"}],\"id\":1}", "function_tag": "GET_BLOCKNUM", "result_parsing": { - "parser_arg": [ - "0", - "context", - "slot" - ], - "parser_func": "PARSE_CANONICAL" - }, - "api_name": "getLatestBlockhash" + "parser_func": "DEFAULT" + } }, { + "function_template": "%d", "function_tag": "GET_BLOCK_BY_NUM", - "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getBlock\",\"params\":[%d,{\"transactionDetails\":\"none\",\"rewards\":false}],\"id\":1}", "result_parsing": { - "parser_arg": [ - "0", - "blockhash" - ], - "parser_func": "PARSE_CANONICAL", - "encoding": "base64" - }, - "api_name": "getBlock" + "parser_func": "DEFAULT" + } }, { "function_tag": "SUBSCRIBE", @@ -1401,49 +1472,7 @@ "api_name": "voteUnsubscribe" } ], - "verifications": [ - { - "name": "version", - "parse_directive": { - "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getVersion\",\"params\":[],\"id\":1}", - "function_tag": "VERIFICATION", - "result_parsing": { - "parser_arg": [ - "0", - "solana-core" - ], - "parser_func": "PARSE_CANONICAL" - }, - "api_name": "getVersion" - }, - "values": [ - { - "expected_value": "*" - } - ] - }, - { - "name": "tokens-owner-indexed", - "parse_directive": { - "function_template": "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"getTokenAccountsByOwner\",\"params\":[\"4Qkev8aNZcqFNSRhQzwyLMFSsi94jHqE8WNVTJzTP99F\",{\"programId\":\"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA\"},{\"encoding\":\"jsonParsed\"}]}", - "function_tag": "VERIFICATION", - "result_parsing": { - "parser_arg": [ - "0", - "value" - ], - "parser_func": "PARSE_CANONICAL" - }, - "api_name": "getTokenAccountsByOwner" - }, - "values": [ - { - "expected_value": "*", - "severity": "Warning" - } - ] - } - ] + "verifications": [] } ] }, diff --git a/protocol/chainlib/chainproxy/rpcclient/handler.go b/protocol/chainlib/chainproxy/rpcclient/handler.go index bb1aa31199..acedfd97d9 100755 --- a/protocol/chainlib/chainproxy/rpcclient/handler.go +++ b/protocol/chainlib/chainproxy/rpcclient/handler.go @@ -237,13 +237,16 @@ func (h *handler) handleImmediate(msg *JsonrpcMessage) bool { h.handleSubscriptionResultTendermint(msg) return true case msg.isEthereumNotification(): - if strings.HasSuffix(msg.Method, notificationMethodSuffix) { + if strings.HasSuffix(msg.Method, ethereumNotificationMethodSuffix) { h.handleSubscriptionResultEthereum(msg) return true + } else if strings.HasSuffix(msg.Method, solanaNotificationMethodSuffix) { + h.handleSubscriptionResultSolana(msg) + return true } return false case msg.isStarkNetPathfinderNotification(): - if strings.HasSuffix(msg.Method, notificationMethodSuffix) { + if strings.HasSuffix(msg.Method, ethereumNotificationMethodSuffix) { h.handleSubscriptionResultStarkNetPathfinder(msg) return true } @@ -258,7 +261,7 @@ func (h *handler) handleImmediate(msg *JsonrpcMessage) bool { } func (h *handler) handleSubscriptionResultStarkNetPathfinder(msg *JsonrpcMessage) { - var result starkNetPathfinderSubscriptionResult + var result integerIdSubscriptionResult if err := json.Unmarshal(msg.Result, &result); err != nil { utils.LavaFormatTrace("Dropping invalid starknet pathfinder subscription message", utils.LogAttr("err", err), @@ -290,6 +293,21 @@ func (h *handler) handleSubscriptionResultEthereum(msg *JsonrpcMessage) { } } +func (h *handler) handleSubscriptionResultSolana(msg *JsonrpcMessage) { + var result integerIdSubscriptionResult + if err := json.Unmarshal(msg.Params, &result); err != nil { + utils.LavaFormatTrace("Dropping invalid solana subscription message", + utils.LogAttr("err", err), + utils.LogAttr("params", string(msg.Params)), + ) + h.log.Debug("Dropping invalid subscription message") + return + } + if h.clientSubs[strconv.Itoa(result.ID)] != nil { + h.clientSubs[strconv.Itoa(result.ID)].deliver(msg) + } +} + func (h *handler) handleSubscriptionResultTendermint(msg *JsonrpcMessage) { var result tendermintSubscriptionResult if err := json.Unmarshal(msg.Result, &result); err != nil { diff --git a/protocol/chainlib/chainproxy/rpcclient/json.go b/protocol/chainlib/chainproxy/rpcclient/json.go index 794ad3ebe4..84ab2e0a6e 100755 --- a/protocol/chainlib/chainproxy/rpcclient/json.go +++ b/protocol/chainlib/chainproxy/rpcclient/json.go @@ -33,11 +33,12 @@ import ( ) const ( - Vsn = "2.0" - serviceMethodSeparator = "_" - subscribeMethodSuffix = "_subscribe" - unsubscribeMethodSuffix = "_unsubscribe" - notificationMethodSuffix = "_subscription" + Vsn = "2.0" + serviceMethodSeparator = "_" + subscribeMethodSuffix = "_subscribe" + unsubscribeMethodSuffix = "_unsubscribe" + ethereumNotificationMethodSuffix = "_subscription" + solanaNotificationMethodSuffix = "Notification" defaultWriteTimeout = 10 * time.Second // used if context has no deadline ) @@ -49,7 +50,7 @@ type ethereumSubscriptionResult struct { Result json.RawMessage `json:"result,omitempty"` } -type starkNetPathfinderSubscriptionResult struct { +type integerIdSubscriptionResult struct { ID int `json:"subscription"` Result json.RawMessage `json:"result,omitempty"` } diff --git a/protocol/chainlib/chainproxy/rpcclient/subscription.go b/protocol/chainlib/chainproxy/rpcclient/subscription.go index 803ecc171c..cc882364ac 100755 --- a/protocol/chainlib/chainproxy/rpcclient/subscription.go +++ b/protocol/chainlib/chainproxy/rpcclient/subscription.go @@ -181,7 +181,7 @@ func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { ctx := context.Background() return n.h.conn.writeJSON(ctx, &JsonrpcMessage{ Version: Vsn, - Method: n.namespace + notificationMethodSuffix, + Method: n.namespace + ethereumNotificationMethodSuffix, Params: params, }) }