Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ws client - 1 #10

Merged
merged 3 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions examples/http_client_2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
ScaleRb.logger.level = Logger::DEBUG

client = ScaleRb::HttpClient.new('https://polkadot-rpc.dwellir.com')
block_number = 21585684
block_number = 21711742
block_hash = client.chain_getBlockHash(block_number)
storage = client.get_storage(block_hash, 'System', 'Events')
metadata = client.get_metadata(block_hash)

storage_query = ScaleRb::WsClient::StorageQuery.new(
pallet_name: 'System',
storage_name: 'Events',
)
storage = client.get_storage(block_hash, storage_query, metadata)
puts "block #{block_number}(#{block_hash}) has #{storage.length} events"
6 changes: 3 additions & 3 deletions examples/ws_client_1.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
require 'scale_rb'

# ScaleRb.logger.level = Logger::DEBUG
ScaleRb.logger.level = Logger::DEBUG

ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client|
block_hash = client.chain_getBlockHash(21585684)
runtime_version = client.state_getRuntimeVersion(block_hash)
puts runtime_version['specName']
puts runtime_version['specVersion']
puts runtime_version[:specName]
puts runtime_version[:specVersion]
end
11 changes: 8 additions & 3 deletions examples/ws_client_2.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
require 'scale_rb'

ScaleRb.logger.level = Logger::DEBUG

# You can have multiple subscriptions at the same time
ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client|
client.chain_subscribeNewHead do |head|
puts "Received new head at height: #{head['number'].to_i(16)}"
puts "Received new head at height: #{head[:number].to_i(16)}"
end

client.state_subscribeStorage do |storage|
block_hash = storage[:block]
changes = storage[:changes]
puts "Received #{changes.size} storage changes at block: #{block_hash}"
end
end
5 changes: 3 additions & 2 deletions examples/ws_client_3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@

# ScaleRb.logger.level = Logger::DEBUG

# Unsubscribe after receiving 5 new heads
ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client|
count = 0

subscription_id = client.chain_subscribeNewHead do |head|
count = count + 1

if count < 5
block_number = head['number'].to_i(16)
block_number = head[:number].to_i(16)
block_hash = client.chain_getBlockHash(block_number)
puts "Received new head at height: #{block_number}, block hash: #{block_hash}"
else
unsub_result = client.chain_unsubscribeNewHead(subscription_id)
puts "Unsubscribed from new heads: #{unsub_result}"
puts "Unsubscribe result: #{unsub_result}"
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/client/client_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_storage(block_hash, storage_query, metadata = nil)
def query_storage_at(block_hash, storage_keys, type_id, default, registry)
result = state_queryStorageAt(storage_keys, block_hash)
result.map do |item|
item['changes'].map do |change|
item[:changes].map do |change|
storage_key = change[0]
data = change[1] || default
storage = data.nil? ? nil : PortableCodec.decode(type_id, data._to_bytes, registry)[0]
Expand Down
11 changes: 6 additions & 5 deletions lib/client/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def initialize(url)
raise 'url format is not correct' unless url.match?(url_regex)

@uri = URI.parse(url)
@supported_methods = request('rpc_methods', [])['methods']
@supported_methods = request('rpc_methods', [])[:methods]
end

def request(method, params = [])
Expand All @@ -37,11 +37,12 @@ def request(method, params = [])
response = http.request(request)
raise response unless response.is_a?(Net::HTTPOK)

body = JSON.parse(response.body)
# parse response, make key symbol
body = JSON.parse(response.body, symbolize_names: true)
ScaleRb.logger.debug "Response: #{body}"
raise body['error'] if body['error']
raise body[:error] if body[:error]

body['result']
body[:result]
end

def respond_to_missing?(*_args)
Expand All @@ -54,4 +55,4 @@ def method_missing(method, *args)
request(method.to_s, args)
end
end
end
end
131 changes: 41 additions & 90 deletions lib/client/ws_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,50 +9,34 @@
module ScaleRb
class WsClient
def self.start(url)
Async do |task|

Sync do |task|
endpoint = Async::HTTP::Endpoint.parse(url, alpn_protocols: Async::HTTP::Protocol::HTTP11.names)
client = WsClient.new

task.async do
Async::WebSocket::Client.connect(endpoint) do |connection|
Async do
while request = client.next_request
ScaleRb.logger.debug "Sending request: #{request.to_json}"
connection.write(request.to_json)
end
end

# inside main task
Async::WebSocket::Client.connect(endpoint) do |connection|
client = WsClient.new(connection)

recv_task = task.async do
while message = connection.read
data = JSON.parse(message)
data = message.parse
ScaleRb.logger.debug "Received message: #{data}"

Async do
task.async do
client.handle_response(data)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd want to add error handling here - i.e. what happens if handle_response(data) fails?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'll add error handling here

rescue => e
ScaleRb.logger.error "#{e.class}: #{e.message}"
ScaleRb.logger.error e.backtrace.join("\n")
task.stop
end
end
rescue => e
ScaleRb.logger.error "#{e.class}: #{e.message}"
ScaleRb.logger.error e.backtrace.join("\n")
ensure
task.stop
end
end

task.async do
client.supported_methods = client.rpc_methods()['methods']
client.supported_methods = client.rpc_methods()[:methods]
yield client
rescue => e
ScaleRb.logger.error "#{e.class}: #{e.message}"
ScaleRb.logger.error e.backtrace.join("\n")
task.stop

recv_task.wait
ensure
recv_task&.stop
end
end
end
end # Sync

end # start
end
end

Expand All @@ -61,8 +45,8 @@ class WsClient
include ClientExt
attr_accessor :supported_methods

def initialize
@queue = Async::Queue.new
def initialize(connection)
@connection = connection
@response_handler = ResponseHandler.new
@subscription_handler = SubscriptionHandler.new
@request_id = 1
Expand All @@ -87,7 +71,7 @@ def method_missing(method, *args)
raise "A subscribe method needs a block" unless block_given?

subscribe(method, args) do |notification|
yield notification['params']['result']
yield notification[:params][:result]
end
else
request(method, args)
Expand All @@ -111,14 +95,10 @@ def unsubscribe(method, subscription_id)
end
end

def next_request
@queue.dequeue
end

def handle_response(response)
if response.key?('id')
if response.key?(:id)
@response_handler.handle(response)
elsif response.key?('method')
elsif response.key?(:method)
@subscription_handler.handle(response)
else
puts "Received an unknown message: #{response}"
Expand All @@ -131,53 +111,34 @@ def request(method, params = [])
response_future = Async::Notification.new

@response_handler.register(@request_id, proc { |response|
# this is running in the main task
response_future.signal(response['result'])
response_future.signal(response[:result])
})

request = JsonRpcRequest.new(@request_id, method, params)
@queue.enqueue(request)
request = { jsonrpc: '2.0', id: @request_id, method: method, params: params }
ScaleRb.logger.debug "Sending request: #{request}"
@connection.write(request.to_json)

@request_id += 1

response_future.wait
end
end

class JsonRpcRequest
attr_reader :id, :method, :params

def initialize(id, method, params = {})
@id = id
@method = method
@params = params
end

def to_json(*_args)
{ jsonrpc: '2.0', id: @id, method: @method, params: @params }.to_json
end

# def to_s
# to_json
# end
end

class ResponseHandler
def initialize
@handlers = {}
@callbacks = {}
end

# handler: a proc with response data as param
def register(id, handler)
@handlers[id] = handler
# callback: a proc with response data as param
def register(id, callback)
@callbacks[id] = callback
end

def handle(response)
id = response['id']
if @handlers.key?(id)
handler = @handlers[id]
handler.call(response)
@handlers.delete(id)
id = response[:id]
if @callbacks.key?(id)
callback = @callbacks[id]
callback.call(response)
@callbacks.delete(id)
else
ScaleRb.logger.debug "Received a message with unknown id: #{response}"
end
Expand All @@ -186,33 +147,23 @@ def handle(response)

class SubscriptionHandler
def initialize
@subscriptions = {}
@callbacks = {}
end

def subscribe(subscription_id, handler)
@subscriptions[subscription_id] = handler
def subscribe(subscription_id, callback)
@callbacks[subscription_id] = callback
end

def unsubscribe(subscription_id)
@subscriptions.delete(subscription_id)
@callbacks.delete(subscription_id)
end

def handle(notification)
subscription_id = notification.dig('params', 'subscription')
subscription_id = notification.dig(:params, :subscription)
return if subscription_id.nil?

if @subscriptions.key?(subscription_id)
@subscriptions[subscription_id].call(notification)
else
# the subscription_id may be not registered.
# in client.subscribe function,
# ...
# subscription_id = request(method, params)
# @subscription_handler.subscribe(subscription_id, block)
# ...
# the request(method, params) may be slow, so the subscription_id may be not registered when the first notification comes.
sleep 0.01
handle(notification)
if @callbacks.key?(subscription_id)
@callbacks[subscription_id].call(notification)
end
end
end
Expand Down