forked from rabbitmq/rabbitmq-tutorials
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpc_client.rb
executable file
·66 lines (54 loc) · 1.45 KB
/
rpc_client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#!/usr/bin/env ruby
# encoding: utf-8
# Note: This is just proof of concept. For
# real-world usage, you are strongly advised
# to use https://github.com/ruby-amqp/rpc
# or some other RPC library.
require "amqp"
class FibonacciRpcClient
def initialize
subscribe_to_callback_queue
end
def connection
@connection ||= AMQP.connect(:host => "localhost")
end
def channel
@channel ||= AMQP::Channel.new(self.connection)
end
def callback_queue
@callback_queue ||= self.channel.queue("", :exclusive => true)
end
def requests
@requests ||= Hash.new
end
def call(n, &block)
corr_id = rand(10_000_000).to_s
self.requests[corr_id] = nil
self.callback_queue.append_callback(:declare) do
self.channel.default_exchange.publish(n.to_s, :routing_key => "rpc_queue", :reply_to => self.callback_queue.name, :correlation_id => corr_id)
EM.add_periodic_timer(0.1) do
# p self.requests
if result = self.requests[corr_id]
block.call(result.to_i)
EM.stop
end
end
end
end
private
def subscribe_to_callback_queue
self.callback_queue.subscribe do |header, body|
corr_id = header.correlation_id
unless self.requests[corr_id]
self.requests[corr_id] = body
end
end
end
end
EM.run do
fibonacci_rpc = FibonacciRpcClient.new()
puts " [x] Requesting fib(30)"
fibonacci_rpc.call(30) do |response|
puts " [.] Got #{response}"
end
end