From d9faf4e2bd1270b83c38993afcce9ec9002224fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Tue, 17 Oct 2023 10:52:58 +0200 Subject: [PATCH] Bugfix headers exchange wont match tables (#575) Header exchange couldn't match on tables because a mix of AMQ::Protocol::Table and Hash(String, AMQ::Protocol::Field). This will change to use AMQ::Protocol::Table everywhere. --- CHANGELOG.md | 1 + shard.lock | 2 +- spec/message_routing_spec.cr | 47 ++++++++++++++++--------- spec/policies_spec.cr | 6 ++-- src/lavinmq/exchange/consistent_hash.cr | 4 +-- src/lavinmq/exchange/exchange.cr | 26 ++++++++------ src/lavinmq/exchange/headers.cr | 10 +++--- src/lavinmq/federation/link.cr | 13 ++++--- src/lavinmq/http/controller/bindings.cr | 10 +++--- src/lavinmq/queue/queue.cr | 6 ++-- src/lavinmq/queue/queue_factory.cr | 10 +++--- src/lavinmq/queue/stream_queue.cr | 2 +- src/lavinmq/vhost.cr | 26 +++++++------- 13 files changed, 90 insertions(+), 73 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 16e9983bd0..7274034aa4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - A bug in delay exchanges caused messages to be routed to x-dead-letter-exchange instead of bound queues. It also ruined any dead lettering headers. +- A bug that prevented headers exchange to match on table in message headers. ## [1.2.4] - 2023-09-26 diff --git a/shard.lock b/shard.lock index 5da04564cc..ed2b7bb4cd 100644 --- a/shard.lock +++ b/shard.lock @@ -6,7 +6,7 @@ shards: amq-protocol: git: https://github.com/cloudamqp/amq-protocol.cr.git - version: 1.1.10 + version: 1.1.11 amqp-client: git: https://github.com/cloudamqp/amqp-client.cr.git diff --git a/spec/message_routing_spec.cr b/spec/message_routing_spec.cr index f9cba4a1fe..6697da6f29 100644 --- a/spec/message_routing_spec.cr +++ b/spec/message_routing_spec.cr @@ -17,7 +17,7 @@ describe LavinMQ::DirectExchange do vhost = Server.vhosts.create("x") q1 = LavinMQ::Queue.new(vhost, "q1") x = LavinMQ::DirectExchange.new(vhost, "") - x.bind(q1, "q1", Hash(String, LavinMQ::AMQP::Field).new) + x.bind(q1, "q1", LavinMQ::AMQP::Table.new) x.matches("q1").should eq(Set{q1}) end @@ -174,16 +174,16 @@ describe LavinMQ::HeadersExchange do x = LavinMQ::HeadersExchange.new(vhost, "h", false, false, true) end - hdrs_all = { + hdrs_all = LavinMQ::AMQP::Table.new({ "x-match" => "all", "org" => "84codes", "user" => "test", - } of String => LavinMQ::AMQP::Field - hdrs_any = { + }) + hdrs_any = LavinMQ::AMQP::Table.new({ "x-match" => "any", "org" => "84codes", "user" => "test", - } of String => LavinMQ::AMQP::Field + }) describe "match all" do it "should match if same args" do @@ -225,15 +225,26 @@ describe LavinMQ::HeadersExchange do msg_hdrs["user"] = "hest" x.matches("", msg_hdrs).size.should eq 0 end + + it "should match nestled amq-protocol tables" do + x = LavinMQ::HeadersExchange.new(vhost, "h", false, false, true) + q10 = LavinMQ::Queue.new(vhost, "q10") + bind_hdrs = LavinMQ::AMQP::Table.new({ + "x-match" => "any", + "tbl" => LavinMQ::AMQP::Table.new({"foo": "bar"}), + }) + x.bind(q10, "", bind_hdrs) # to_h because that's what's done in VHost + msg_hdrs = bind_hdrs.clone + msg_hdrs.delete("x-match") + x.matches("", msg_hdrs).size.should eq 1 + end end it "should handle multiple bindings" do q10 = LavinMQ::Queue.new(vhost, "q10") x = LavinMQ::HeadersExchange.new(vhost, "h", false, false, true) - hdrs1 = {"x-match" => "any", "org" => "84codes", - "user" => "test"} of String => LavinMQ::AMQP::Field - hdrs2 = {"x-match" => "all", "org" => "google", - "user" => "test"} of String => LavinMQ::AMQP::Field + hdrs1 = LavinMQ::AMQP::Table.new({"x-match" => "any", "org" => "84codes", "user" => "test"}) + hdrs2 = LavinMQ::AMQP::Table.new({"x-match" => "all", "org" => "google", "user" => "test"}) x.bind(q10, "", hdrs1) x.bind(q10, "", hdrs2) @@ -249,12 +260,13 @@ describe LavinMQ::HeadersExchange do hsh = {"k" => "v"} of String => LavinMQ::AMQP::Field arrf = [1] of LavinMQ::AMQP::Field arru = [1_u8] of LavinMQ::AMQP::Field - hdrs = {"Nil" => nil, "Bool" => true, "UInt8" => 1_u8, "UInt16" => 1_u16, "UInt32" => 1_u32, - "Int16" => 1_u16, "Int32" => 1_i32, "Int64" => 1_i64, "Float32" => 1_f32, - "Float64" => 1_f64, "String" => "String", "Array(Field)" => arrf, - "Array(UInt8)" => arru, "Time" => Time.utc, "Hash(String, Field)" => hsh, - "x-match" => "all", - } of String => LavinMQ::AMQP::Field + hdrs = LavinMQ::AMQP::Table.new({ + "Nil" => nil, "Bool" => true, "UInt8" => 1_u8, "UInt16" => 1_u16, "UInt32" => 1_u32, + "Int16" => 1_u16, "Int32" => 1_i32, "Int64" => 1_i64, "Float32" => 1_f32, + "Float64" => 1_f64, "String" => "String", "Array(Field)" => arrf, + "Array(UInt8)" => arru, "Time" => Time.utc, "Hash(String, Field)" => hsh, + "x-match" => "all", + }) x.bind(q11, "", hdrs) x.matches("", hdrs).should eq Set{q11} end @@ -262,8 +274,9 @@ describe LavinMQ::HeadersExchange do it "should handle unbind" do q12 = LavinMQ::Queue.new(vhost, "q12") x = LavinMQ::HeadersExchange.new(vhost, "h", false, false, true) - hdrs1 = {"x-match" => "any", "org" => "84codes", - "user" => "test"} of String => LavinMQ::AMQP::Field + hdrs1 = LavinMQ::AMQP::Table.new({ + "x-match" => "any", "org" => "84codes", "user" => "test", + }) x.bind(q12, "", hdrs1) x.unbind(q12, "", hdrs1) x.matches("", hdrs1).size.should eq 0 diff --git a/spec/policies_spec.cr b/spec/policies_spec.cr index cff7402ac8..268a65a8ab 100644 --- a/spec/policies_spec.cr +++ b/spec/policies_spec.cr @@ -218,7 +218,7 @@ describe LavinMQ::VHost do it "arguments should have priority for non numeric arguments" do vhost.exchanges["no-ae"] = LavinMQ::DirectExchange.new(vhost, "no-ae") vhost.exchanges["x-with-ae"] = LavinMQ::DirectExchange.new(vhost, "x-with-ae", - arguments: {"x-alternate-exchange" => "ae2".as(AMQ::Protocol::Field)}) + arguments: AMQ::Protocol::Table.new({"x-alternate-exchange": "ae2"})) vhost.add_policy("test", ".*", "all", definitions, 100_i8) sleep 0.01 vhost.exchanges["no-ae"].@alternate_exchange.should eq "dead-letters" @@ -230,8 +230,8 @@ describe LavinMQ::VHost do end it "should use the lowest value" do - vhost.queues["test1"] = LavinMQ::Queue.new(vhost, "test1", arguments: {"x-max-length" => 1_i64.as(AMQ::Protocol::Field)}) - vhost.queues["test2"] = LavinMQ::Queue.new(vhost, "test2", arguments: {"x-max-length" => 11_i64.as(AMQ::Protocol::Field)}) + vhost.queues["test1"] = LavinMQ::Queue.new(vhost, "test1", arguments: LavinMQ::AMQP::Table.new({"x-max-length" => 1_i64})) + vhost.queues["test2"] = LavinMQ::Queue.new(vhost, "test2", arguments: LavinMQ::AMQP::Table.new({"x-max-length" => 11_i64})) vhost.add_policy("test", ".*", "all", definitions, 100_i8) sleep 0.01 vhost.queues["test1"].@max_length.should eq 1 diff --git a/src/lavinmq/exchange/consistent_hash.cr b/src/lavinmq/exchange/consistent_hash.cr index 59074d9f73..a42796093c 100644 --- a/src/lavinmq/exchange/consistent_hash.cr +++ b/src/lavinmq/exchange/consistent_hash.cr @@ -24,7 +24,7 @@ module LavinMQ end end - def bind(destination : Destination, routing_key : String, headers : Hash(String, AMQP::Field)?) + def bind(destination : Destination, routing_key : String, headers : AMQP::Table?) w = weight(routing_key) @hasher.add(destination.name, w, destination) ret = case destination @@ -37,7 +37,7 @@ module LavinMQ ret end - def unbind(destination : Destination, routing_key : String, headers : Hash(String, AMQP::Field)?) + def unbind(destination : Destination, routing_key : String, headers : AMQP::Table?) w = weight(routing_key) ret = case destination when Queue diff --git a/src/lavinmq/exchange/exchange.cr b/src/lavinmq/exchange/exchange.cr index 528c92b3db..1572d3d855 100644 --- a/src/lavinmq/exchange/exchange.cr +++ b/src/lavinmq/exchange/exchange.cr @@ -6,7 +6,7 @@ require "../observable" require "../queue" module LavinMQ - alias BindingKey = Tuple(String, Hash(String, AMQP::Field)?) + alias BindingKey = Tuple(String, AMQP::Table?) alias Destination = Queue | Exchange abstract class Exchange @@ -30,7 +30,7 @@ module LavinMQ def initialize(@vhost : VHost, @name : String, @durable = false, @auto_delete = false, @internal = false, - @arguments = Hash(String, AMQP::Field).new) + @arguments = AMQP::Table.new) @queue_bindings = Hash(BindingKey, Set(Queue)).new do |h, k| h[k] = Set(Queue).new end @@ -91,12 +91,16 @@ module LavinMQ def match?(type, durable, auto_delete, internal, arguments) delayed = type == "x-delayed-message" - frame_args = arguments.to_h.dup.reject("x-delayed-type").merge({"x-delayed-exchange" => true}) + frame_args = arguments + if delayed + frame_args = frame_args.clone.merge!({"x-delayed-exchange": true}) + frame_args.delete("x-delayed-type") + end self.type == (delayed ? arguments["x-delayed-type"] : type) && @durable == durable && @auto_delete == auto_delete && @internal == internal && - @arguments == (delayed ? frame_args : arguments.to_h) + @arguments == frame_args end def in_use? @@ -123,10 +127,10 @@ module LavinMQ return unless @delayed q_name = "amq.delayed.#{@name}" raise "Exchange name too long" if q_name.bytesize > MAX_NAME_LENGTH - arguments = Hash(String, AMQP::Field){ + arguments = AMQP::Table.new({ "x-dead-letter-exchange" => @name, "auto-delete" => @auto_delete, - } + }) @delayed_queue = if durable? DurableDelayedExchangeQueue.new(@vhost, q_name, false, false, arguments) else @@ -137,7 +141,7 @@ module LavinMQ REPUBLISH_HEADERS = {"x-head", "x-tail", "x-from"} - protected def after_bind(destination : Destination, routing_key : String, headers : Hash(String, AMQP::Field)?) + protected def after_bind(destination : Destination, routing_key : String, headers : AMQP::Table?) notify_observers(:bind, binding_details({routing_key, headers}, destination)) true end @@ -162,10 +166,10 @@ module LavinMQ end abstract def type : String - abstract def bind(destination : Queue, routing_key : String, headers : Hash(String, AMQP::Field)?) - abstract def unbind(destination : Queue, routing_key : String, headers : Hash(String, AMQP::Field)?) - abstract def bind(destination : Exchange, routing_key : String, headers : Hash(String, AMQP::Field)?) - abstract def unbind(destination : Exchange, routing_key : String, headers : Hash(String, AMQP::Field)?) + abstract def bind(destination : Queue, routing_key : String, headers : AMQP::Table?) + abstract def unbind(destination : Queue, routing_key : String, headers : AMQP::Table?) + abstract def bind(destination : Exchange, routing_key : String, headers : AMQP::Table?) + abstract def unbind(destination : Exchange, routing_key : String, headers : AMQP::Table?) abstract def do_queue_matches(routing_key : String, headers : AMQP::Table?, & : Queue -> _) abstract def do_exchange_matches(routing_key : String, headers : AMQP::Table?, & : Exchange -> _) diff --git a/src/lavinmq/exchange/headers.cr b/src/lavinmq/exchange/headers.cr index 34a6810d6e..11578406b3 100644 --- a/src/lavinmq/exchange/headers.cr +++ b/src/lavinmq/exchange/headers.cr @@ -8,14 +8,14 @@ module LavinMQ def initialize(@vhost : VHost, @name : String, @durable = false, @auto_delete = false, @internal = false, - @arguments = Hash(String, AMQP::Field).new) + @arguments = AMQP::Table.new) validate!(@arguments) super end def bind(destination : Queue, routing_key, headers) validate!(headers) - args = headers ? @arguments.merge(headers) : @arguments + args = headers ? @arguments.clone.merge!(headers) : @arguments ret = @queue_bindings[{routing_key, args}].add? destination after_bind(destination, routing_key, headers) ret @@ -23,21 +23,21 @@ module LavinMQ def bind(destination : Exchange, routing_key, headers) validate!(headers) - args = headers ? @arguments.merge(headers) : @arguments + args = headers ? @arguments.clone.merge!(headers) : @arguments ret = @exchange_bindings[{routing_key, args}].add? destination after_bind(destination, routing_key, headers) ret end def unbind(destination : Queue, routing_key, headers) - args = headers ? @arguments.merge(headers) : @arguments + args = headers ? @arguments.clone.merge!(headers) : @arguments ret = @queue_bindings[{routing_key, args}].delete destination after_unbind(destination, routing_key, headers) ret end def unbind(destination : Exchange, routing_key, headers) - args = headers ? @arguments.merge(headers) : @arguments + args = headers ? @arguments.clone.merge!(headers) : @arguments ret = @exchange_bindings[{routing_key, args}].delete destination after_unbind(destination, routing_key, headers) ret diff --git a/src/lavinmq/federation/link.cr b/src/lavinmq/federation/link.cr index 58df340f81..db2b3202d6 100644 --- a/src/lavinmq/federation/link.cr +++ b/src/lavinmq/federation/link.cr @@ -265,13 +265,13 @@ module LavinMQ when :bind with_consumer_q do |q| b = data_as_binding_details(data) - args = ::AMQP::Client::Arguments.new(b.arguments) + args = b.arguments || ::AMQP::Client::Arguments.new q.bind(@upstream_exchange, b.routing_key, args: args) end when :unbind with_consumer_q do |q| b = data_as_binding_details(data) - args = ::AMQP::Client::Arguments.new(b.arguments) + args = b.arguments || ::AMQP::Client::Arguments.new q.unbind(@upstream_exchange, b.routing_key, args: args) end else raise "Unexpected event '#{event}'" @@ -317,10 +317,9 @@ module LavinMQ end private def setup(upstream_client) - args = ::AMQP::Client::Arguments.new(@federated_ex.arguments) ch, _ = try_passive(upstream_client) do |uch, passive| uch.exchange(@upstream_exchange, type: @federated_ex.type, - args: args, passive: passive) + args: @federated_ex.arguments, passive: passive) end args2 = ::AMQP::Client::Arguments.new({ "x-downstream-name" => System.hostname, @@ -331,7 +330,7 @@ module LavinMQ uch.exchange(@upstream_q, type: "x-federation-upstream", args: args2, passive: passive) end - q_args = Hash(String, AMQP::Field){"x-internal-purpose" => "federation"} + q_args = ::AMQP::Client::Arguments.new({"x-internal-purpose" => "federation"}) if expires = @upstream.expires q_args["x-expires"] = expires end @@ -339,11 +338,11 @@ module LavinMQ q_args["x-message-ttl"] = msg_ttl end ch, q = try_passive(upstream_client, ch) do |uch, passive| - uch.queue(@upstream_q, args: ::AMQP::Client::Arguments.new(q_args), passive: passive) + uch.queue(@upstream_q, args: q_args, passive: passive) end @federated_ex.register_observer(self) @federated_ex.bindings_details.each do |binding| - args = ::AMQP::Client::Arguments.new(binding.arguments) + args = binding.arguments || ::AMQP::Client::Arguments.new q.bind(@upstream_exchange, binding.routing_key, args: args) end {ch, q} diff --git a/src/lavinmq/http/controller/bindings.cr b/src/lavinmq/http/controller/bindings.cr index 74a967f8d2..c19bc5d10d 100644 --- a/src/lavinmq/http/controller/bindings.cr +++ b/src/lavinmq/http/controller/bindings.cr @@ -63,7 +63,7 @@ module LavinMQ bad_request(context, "Field 'routing_key' is required") end ok = e.vhost.bind_queue(q.name, e.name, routing_key, arguments) - props = BindingDetails.hash_key({routing_key, arguments.to_h}) + props = BindingDetails.hash_key({routing_key, arguments}) context.response.headers["Location"] = q.name + "/" + props context.response.status_code = 201 Log.debug do @@ -101,8 +101,8 @@ module LavinMQ found = false e.queue_bindings.each do |k, destinations| next unless destinations.includes?(q) && BindingDetails.hash_key(k) == props - arguments = k[1] || Hash(String, AMQP::Field).new - @amqp_server.vhosts[vhost].unbind_queue(q.name, e.name, k[0], AMQP::Table.new arguments) + arguments = k[1] || AMQP::Table.new + @amqp_server.vhosts[vhost].unbind_queue(q.name, e.name, k[0], arguments) found = true Log.debug { "exchange '#{e.name}' unbound from queue '#{q.name}' with key '#{k}'" } break @@ -144,7 +144,7 @@ module LavinMQ bad_request(context, "Field 'routing_key' is required") end source.vhost.bind_exchange(destination.name, source.name, routing_key, arguments) - props = BindingDetails.hash_key({routing_key, arguments.to_h}) + props = BindingDetails.hash_key({routing_key, arguments}) context.response.headers["Location"] = context.request.path + "/" + props context.response.status_code = 201 end @@ -180,7 +180,7 @@ module LavinMQ found = false source.exchange_bindings.each do |k, destinations| next unless destinations.includes?(destination) && BindingDetails.hash_key(k) == props - arguments = AMQP::Table.new(k[1] || Hash(String, AMQP::Field).new) + arguments = k[1] || AMQP::Table.new @amqp_server.vhosts[vhost].unbind_exchange(destination.name, source.name, k[0], arguments) found = true break diff --git a/src/lavinmq/queue/queue.cr b/src/lavinmq/queue/queue.cr index 2f935d8bd5..7e00f043cc 100644 --- a/src/lavinmq/queue/queue.cr +++ b/src/lavinmq/queue/queue.cr @@ -133,7 +133,7 @@ module LavinMQ def initialize(@vhost : VHost, @name : String, @exclusive = false, @auto_delete = false, - @arguments = Hash(String, AMQP::Field).new) + @arguments = AMQP::Table.new) @last_get_time = RoughTime.monotonic @log = Log.for "queue[vhost=#{@vhost.name} name=#{@name}]" @data_dir = make_data_dir @@ -871,14 +871,14 @@ module LavinMQ durable? == frame.durable && @exclusive == frame.exclusive && @auto_delete == frame.auto_delete && - @arguments == frame.arguments.to_h + @arguments == frame.arguments end def match?(durable, exclusive, auto_delete, arguments) durable? == durable && @exclusive == exclusive && @auto_delete == auto_delete && - @arguments == arguments.to_h + @arguments == arguments end def in_use? diff --git a/src/lavinmq/queue/queue_factory.cr b/src/lavinmq/queue/queue_factory.cr index 9706d6642c..1cf992fe74 100644 --- a/src/lavinmq/queue/queue_factory.cr +++ b/src/lavinmq/queue/queue_factory.cr @@ -17,26 +17,26 @@ module LavinMQ private def self.make_durable(vhost, frame) if prio_queue? frame - DurablePriorityQueue.new(vhost, frame.queue_name, frame.exclusive, frame.auto_delete, frame.arguments.to_h) + DurablePriorityQueue.new(vhost, frame.queue_name, frame.exclusive, frame.auto_delete, frame.arguments) elsif stream_queue? frame if frame.exclusive raise Error::PreconditionFailed.new("A stream queue cannot be exclusive") elsif frame.auto_delete raise Error::PreconditionFailed.new("A stream queue cannot be auto-delete") end - StreamQueue.new(vhost, frame.queue_name, frame.exclusive, frame.auto_delete, frame.arguments.to_h) + StreamQueue.new(vhost, frame.queue_name, frame.exclusive, frame.auto_delete, frame.arguments) else - DurableQueue.new(vhost, frame.queue_name, frame.exclusive, frame.auto_delete, frame.arguments.to_h) + DurableQueue.new(vhost, frame.queue_name, frame.exclusive, frame.auto_delete, frame.arguments) end end private def self.make_queue(vhost, frame) if prio_queue? frame - PriorityQueue.new(vhost, frame.queue_name, frame.exclusive, frame.auto_delete, frame.arguments.to_h) + PriorityQueue.new(vhost, frame.queue_name, frame.exclusive, frame.auto_delete, frame.arguments) elsif stream_queue? frame raise Error::PreconditionFailed.new("A stream queue cannot be non-durable") else - Queue.new(vhost, frame.queue_name, frame.exclusive, frame.auto_delete, frame.arguments.to_h) + Queue.new(vhost, frame.queue_name, frame.exclusive, frame.auto_delete, frame.arguments) end end diff --git a/src/lavinmq/queue/stream_queue.cr b/src/lavinmq/queue/stream_queue.cr index 5b00f23efa..5d784d7eeb 100644 --- a/src/lavinmq/queue/stream_queue.cr +++ b/src/lavinmq/queue/stream_queue.cr @@ -6,7 +6,7 @@ module LavinMQ class StreamQueue < DurableQueue def initialize(@vhost : VHost, @name : String, @exclusive = false, @auto_delete = false, - @arguments = Hash(String, AMQP::Field).new) + @arguments = AMQP::Table.new) super spawn unmap_unused_segments_loop, name: "StreamQueue#unmap_unused_segments_loop" end diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index d48050068b..6c1b37fa4b 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -283,7 +283,7 @@ module LavinMQ when AMQP::Frame::Exchange::Declare return false if @exchanges.has_key? f.exchange_name e = @exchanges[f.exchange_name] = - make_exchange(self, f.exchange_name, f.exchange_type, f.durable, f.auto_delete, f.internal, f.arguments.to_h) + make_exchange(self, f.exchange_name, f.exchange_type, f.durable, f.auto_delete, f.internal, f.arguments) apply_policies([e] of Exchange) unless loading store_definition(f) if !loading && f.durable when AMQP::Frame::Exchange::Delete @@ -301,12 +301,12 @@ module LavinMQ when AMQP::Frame::Exchange::Bind src = @exchanges[f.source]? || return false dst = @exchanges[f.destination]? || return false - return false unless src.bind(dst, f.routing_key, f.arguments.to_h) + return false unless src.bind(dst, f.routing_key, f.arguments) store_definition(f) if !loading && src.durable? && dst.durable? when AMQP::Frame::Exchange::Unbind src = @exchanges[f.source]? || return false dst = @exchanges[f.destination]? || return false - return false unless src.unbind(dst, f.routing_key, f.arguments.to_h) + return false unless src.unbind(dst, f.routing_key, f.arguments) store_definition(f, dirty: true) if !loading && src.durable? && dst.durable? when AMQP::Frame::Queue::Declare return false if @queues.has_key? f.queue_name @@ -330,12 +330,12 @@ module LavinMQ when AMQP::Frame::Queue::Bind x = @exchanges[f.exchange_name]? || return false q = @queues[f.queue_name]? || return false - return false unless x.bind(q, f.routing_key, f.arguments.to_h) + return false unless x.bind(q, f.routing_key, f.arguments) store_definition(f) if !loading && x.durable? && q.durable? && !q.exclusive? when AMQP::Frame::Queue::Unbind x = @exchanges[f.exchange_name]? || return false q = @queues[f.queue_name]? || return false - return false unless x.unbind(q, f.routing_key, f.arguments.to_h) + return false unless x.unbind(q, f.routing_key, f.arguments) store_definition(f, dirty: true) if !loading && x.durable? && q.durable? && !q.exclusive? else raise "Cannot apply frame #{f.class} in vhost #{@name}" end @@ -583,26 +583,26 @@ module LavinMQ @exchanges.each_value.select(&.durable?).each do |e| f = AMQP::Frame::Exchange::Declare.new(0_u16, 0_u16, e.name, e.type, false, e.durable?, e.auto_delete?, e.internal?, - false, AMQP::Table.new(e.arguments)) + false, e.arguments) io.write_bytes f end @queues.each_value.select(&.durable?).each do |q| f = AMQP::Frame::Queue::Declare.new(0_u16, 0_u16, q.name, false, q.durable?, q.exclusive?, - q.auto_delete?, false, AMQP::Table.new(q.arguments)) + q.auto_delete?, false, q.arguments) io.write_bytes f end @exchanges.each_value.select(&.durable?).each do |e| - e.queue_bindings.each do |bt, queues| - args = AMQP::Table.new(bt[1]) || AMQP::Table.new + e.queue_bindings.each do |(routing_key, arguments), queues| + args = arguments || AMQP::Table.new queues.each do |q| - f = AMQP::Frame::Queue::Bind.new(0_u16, 0_u16, q.name, e.name, bt[0], false, args) + f = AMQP::Frame::Queue::Bind.new(0_u16, 0_u16, q.name, e.name, routing_key, false, args) io.write_bytes f end end - e.exchange_bindings.each do |bt, exchanges| - args = AMQP::Table.new(bt[1]) || AMQP::Table.new + e.exchange_bindings.each do |(routing_key, arguments), exchanges| + args = arguments || AMQP::Table.new exchanges.each do |ex| - f = AMQP::Frame::Exchange::Bind.new(0_u16, 0_u16, ex.name, e.name, bt[0], false, args) + f = AMQP::Frame::Exchange::Bind.new(0_u16, 0_u16, ex.name, e.name, routing_key, false, args) io.write_bytes f end end