From a26d67ee91eb95d3ec42f78a8650fb0e43f9c286 Mon Sep 17 00:00:00 2001 From: Aki Wu Date: Sat, 20 Jul 2024 14:49:16 +0800 Subject: [PATCH 1/3] update --- lib/client/ws_client.rb | 86 +++++++++++------------------------------ 1 file changed, 22 insertions(+), 64 deletions(-) diff --git a/lib/client/ws_client.rb b/lib/client/ws_client.rb index 2f8138f..2013c65 100644 --- a/lib/client/ws_client.rb +++ b/lib/client/ws_client.rb @@ -9,49 +9,31 @@ module ScaleRb class WsClient def self.start(url) - Async do |task| + + Sync do |task| endpoint = Async::HTTP::Endpoint.parse(url, alpn_protocols: Async::HTTP::Protocol::HTTP11.names) - client = WsClient.new - - task.async do - Async::WebSocket::Client.connect(endpoint) do |connection| - Async do - while request = client.next_request - ScaleRb.logger.debug "Sending request: #{request.to_json}" - connection.write(request.to_json) - end - end - # inside main task - while message = connection.read - data = JSON.parse(message) - ScaleRb.logger.debug "Received message: #{data}" - - Async do - client.handle_response(data) - rescue => e - ScaleRb.logger.error "#{e.class}: #{e.message}" - ScaleRb.logger.error e.backtrace.join("\n") - task.stop - end - end - rescue => e - ScaleRb.logger.error "#{e.class}: #{e.message}" - ScaleRb.logger.error e.backtrace.join("\n") - ensure - task.stop + Async::WebSocket::Client.connect(endpoint) do |connection| + client = WsClient.new(connection) + + main_task = task.async do + client.supported_methods = client.rpc_methods()['methods'] + yield client end - end - task.async do - client.supported_methods = client.rpc_methods()['methods'] - yield client - rescue => e - ScaleRb.logger.error "#{e.class}: #{e.message}" - ScaleRb.logger.error e.backtrace.join("\n") - task.stop + while message = connection.read + data = message.parse + ScaleRb.logger.debug "Received message: #{data}" + + task.async do + client.handle_response(data) + end + end + ensure + main_task&.stop end end + end end end @@ -61,8 +43,8 @@ class WsClient include ClientExt attr_accessor :supported_methods - def initialize - @queue = Async::Queue.new + def initialize(connection) + @connection = connection @response_handler = ResponseHandler.new @subscription_handler = SubscriptionHandler.new @request_id = 1 @@ -111,10 +93,6 @@ def unsubscribe(method, subscription_id) end end - def next_request - @queue.dequeue - end - def handle_response(response) if response.key?('id') @response_handler.handle(response) @@ -131,12 +109,10 @@ def request(method, params = []) response_future = Async::Notification.new @response_handler.register(@request_id, proc { |response| - # this is running in the main task response_future.signal(response['result']) }) - request = JsonRpcRequest.new(@request_id, method, params) - @queue.enqueue(request) + @connection.write({ jsonrpc: '2.0', id: @request_id, method: method, params: params }.to_json) @request_id += 1 @@ -144,24 +120,6 @@ def request(method, params = []) end end - class JsonRpcRequest - attr_reader :id, :method, :params - - def initialize(id, method, params = {}) - @id = id - @method = method - @params = params - end - - def to_json(*_args) - { jsonrpc: '2.0', id: @id, method: @method, params: @params }.to_json - end - - # def to_s - # to_json - # end - end - class ResponseHandler def initialize @handlers = {} From 9db22f742553bd975667fbe76b90997be750c461 Mon Sep 17 00:00:00 2001 From: Aki Wu Date: Sat, 20 Jul 2024 15:16:21 +0800 Subject: [PATCH 2/3] update --- examples/ws_client_1.rb | 6 +++--- lib/client/ws_client.rb | 17 +++++++++-------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/examples/ws_client_1.rb b/examples/ws_client_1.rb index 57a7e69..ed6d37c 100644 --- a/examples/ws_client_1.rb +++ b/examples/ws_client_1.rb @@ -1,10 +1,10 @@ require 'scale_rb' -# ScaleRb.logger.level = Logger::DEBUG +ScaleRb.logger.level = Logger::DEBUG ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client| block_hash = client.chain_getBlockHash(21585684) runtime_version = client.state_getRuntimeVersion(block_hash) - puts runtime_version['specName'] - puts runtime_version['specVersion'] + puts runtime_version[:specName] + puts runtime_version[:specVersion] end diff --git a/lib/client/ws_client.rb b/lib/client/ws_client.rb index 2013c65..01a13ea 100644 --- a/lib/client/ws_client.rb +++ b/lib/client/ws_client.rb @@ -17,7 +17,7 @@ def self.start(url) client = WsClient.new(connection) main_task = task.async do - client.supported_methods = client.rpc_methods()['methods'] + client.supported_methods = client.rpc_methods()[:methods] yield client end @@ -94,9 +94,9 @@ def unsubscribe(method, subscription_id) end def handle_response(response) - if response.key?('id') + if response.key?(:id) @response_handler.handle(response) - elsif response.key?('method') + elsif response.key?(:method) @subscription_handler.handle(response) else puts "Received an unknown message: #{response}" @@ -109,13 +109,14 @@ def request(method, params = []) response_future = Async::Notification.new @response_handler.register(@request_id, proc { |response| - response_future.signal(response['result']) + response_future.signal(response[:result]) }) - @connection.write({ jsonrpc: '2.0', id: @request_id, method: method, params: params }.to_json) + request = { jsonrpc: '2.0', id: @request_id, method: method, params: params } + ScaleRb.logger.debug "Sending request: #{request}" + @connection.write(request.to_json) @request_id += 1 - response_future.wait end end @@ -131,7 +132,7 @@ def register(id, handler) end def handle(response) - id = response['id'] + id = response[:id] if @handlers.key?(id) handler = @handlers[id] handler.call(response) @@ -156,7 +157,7 @@ def unsubscribe(subscription_id) end def handle(notification) - subscription_id = notification.dig('params', 'subscription') + subscription_id = notification.dig(:params, :subscription) return if subscription_id.nil? if @subscriptions.key?(subscription_id) From 2389deab5cafd53c03cb0e4ef8fa482a02cfb727 Mon Sep 17 00:00:00 2001 From: Aki Wu Date: Sun, 28 Jul 2024 23:58:00 +0800 Subject: [PATCH 3/3] update --- examples/http_client_2.rb | 10 ++++-- examples/ws_client_2.rb | 11 +++++-- examples/ws_client_3.rb | 5 +-- lib/client/client_ext.rb | 2 +- lib/client/http_client.rb | 11 ++++--- lib/client/ws_client.rb | 68 +++++++++++++++++---------------------- 6 files changed, 56 insertions(+), 51 deletions(-) diff --git a/examples/http_client_2.rb b/examples/http_client_2.rb index 0b17d0f..fc19a5d 100644 --- a/examples/http_client_2.rb +++ b/examples/http_client_2.rb @@ -3,7 +3,13 @@ ScaleRb.logger.level = Logger::DEBUG client = ScaleRb::HttpClient.new('https://polkadot-rpc.dwellir.com') -block_number = 21585684 +block_number = 21711742 block_hash = client.chain_getBlockHash(block_number) -storage = client.get_storage(block_hash, 'System', 'Events') +metadata = client.get_metadata(block_hash) + +storage_query = ScaleRb::WsClient::StorageQuery.new( + pallet_name: 'System', + storage_name: 'Events', +) +storage = client.get_storage(block_hash, storage_query, metadata) puts "block #{block_number}(#{block_hash}) has #{storage.length} events" diff --git a/examples/ws_client_2.rb b/examples/ws_client_2.rb index 212a5bc..82e7d39 100644 --- a/examples/ws_client_2.rb +++ b/examples/ws_client_2.rb @@ -1,9 +1,14 @@ require 'scale_rb' -ScaleRb.logger.level = Logger::DEBUG - +# You can have multiple subscriptions at the same time ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client| client.chain_subscribeNewHead do |head| - puts "Received new head at height: #{head['number'].to_i(16)}" + puts "Received new head at height: #{head[:number].to_i(16)}" + end + + client.state_subscribeStorage do |storage| + block_hash = storage[:block] + changes = storage[:changes] + puts "Received #{changes.size} storage changes at block: #{block_hash}" end end diff --git a/examples/ws_client_3.rb b/examples/ws_client_3.rb index bf2e678..006d48a 100644 --- a/examples/ws_client_3.rb +++ b/examples/ws_client_3.rb @@ -2,6 +2,7 @@ # ScaleRb.logger.level = Logger::DEBUG +# Unsubscribe after receiving 5 new heads ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client| count = 0 @@ -9,12 +10,12 @@ count = count + 1 if count < 5 - block_number = head['number'].to_i(16) + block_number = head[:number].to_i(16) block_hash = client.chain_getBlockHash(block_number) puts "Received new head at height: #{block_number}, block hash: #{block_hash}" else unsub_result = client.chain_unsubscribeNewHead(subscription_id) - puts "Unsubscribed from new heads: #{unsub_result}" + puts "Unsubscribe result: #{unsub_result}" end end diff --git a/lib/client/client_ext.rb b/lib/client/client_ext.rb index 021fcf1..70dcad9 100644 --- a/lib/client/client_ext.rb +++ b/lib/client/client_ext.rb @@ -42,7 +42,7 @@ def get_storage(block_hash, storage_query, metadata = nil) def query_storage_at(block_hash, storage_keys, type_id, default, registry) result = state_queryStorageAt(storage_keys, block_hash) result.map do |item| - item['changes'].map do |change| + item[:changes].map do |change| storage_key = change[0] data = change[1] || default storage = data.nil? ? nil : PortableCodec.decode(type_id, data._to_bytes, registry)[0] diff --git a/lib/client/http_client.rb b/lib/client/http_client.rb index 5b4d644..6c111fd 100644 --- a/lib/client/http_client.rb +++ b/lib/client/http_client.rb @@ -17,7 +17,7 @@ def initialize(url) raise 'url format is not correct' unless url.match?(url_regex) @uri = URI.parse(url) - @supported_methods = request('rpc_methods', [])['methods'] + @supported_methods = request('rpc_methods', [])[:methods] end def request(method, params = []) @@ -37,11 +37,12 @@ def request(method, params = []) response = http.request(request) raise response unless response.is_a?(Net::HTTPOK) - body = JSON.parse(response.body) + # parse response, make key symbol + body = JSON.parse(response.body, symbolize_names: true) ScaleRb.logger.debug "Response: #{body}" - raise body['error'] if body['error'] + raise body[:error] if body[:error] - body['result'] + body[:result] end def respond_to_missing?(*_args) @@ -54,4 +55,4 @@ def method_missing(method, *args) request(method.to_s, args) end end -end \ No newline at end of file +end diff --git a/lib/client/ws_client.rb b/lib/client/ws_client.rb index 01a13ea..6ce5dd0 100644 --- a/lib/client/ws_client.rb +++ b/lib/client/ws_client.rb @@ -16,25 +16,27 @@ def self.start(url) Async::WebSocket::Client.connect(endpoint) do |connection| client = WsClient.new(connection) - main_task = task.async do - client.supported_methods = client.rpc_methods()[:methods] - yield client + recv_task = task.async do + while message = connection.read + data = message.parse + ScaleRb.logger.debug "Received message: #{data}" + + task.async do + client.handle_response(data) + end + end end - while message = connection.read - data = message.parse - ScaleRb.logger.debug "Received message: #{data}" + client.supported_methods = client.rpc_methods()[:methods] + yield client - task.async do - client.handle_response(data) - end - end + recv_task.wait ensure - main_task&.stop + recv_task&.stop end - end + end # Sync - end + end # start end end @@ -69,7 +71,7 @@ def method_missing(method, *args) raise "A subscribe method needs a block" unless block_given? subscribe(method, args) do |notification| - yield notification['params']['result'] + yield notification[:params][:result] end else request(method, args) @@ -123,20 +125,20 @@ def request(method, params = []) class ResponseHandler def initialize - @handlers = {} + @callbacks = {} end - # handler: a proc with response data as param - def register(id, handler) - @handlers[id] = handler + # callback: a proc with response data as param + def register(id, callback) + @callbacks[id] = callback end def handle(response) id = response[:id] - if @handlers.key?(id) - handler = @handlers[id] - handler.call(response) - @handlers.delete(id) + if @callbacks.key?(id) + callback = @callbacks[id] + callback.call(response) + @callbacks.delete(id) else ScaleRb.logger.debug "Received a message with unknown id: #{response}" end @@ -145,33 +147,23 @@ def handle(response) class SubscriptionHandler def initialize - @subscriptions = {} + @callbacks = {} end - def subscribe(subscription_id, handler) - @subscriptions[subscription_id] = handler + def subscribe(subscription_id, callback) + @callbacks[subscription_id] = callback end def unsubscribe(subscription_id) - @subscriptions.delete(subscription_id) + @callbacks.delete(subscription_id) end def handle(notification) subscription_id = notification.dig(:params, :subscription) return if subscription_id.nil? - if @subscriptions.key?(subscription_id) - @subscriptions[subscription_id].call(notification) - else - # the subscription_id may be not registered. - # in client.subscribe function, - # ... - # subscription_id = request(method, params) - # @subscription_handler.subscribe(subscription_id, block) - # ... - # the request(method, params) may be slow, so the subscription_id may be not registered when the first notification comes. - sleep 0.01 - handle(notification) + if @callbacks.key?(subscription_id) + @callbacks[subscription_id].call(notification) end end end