diff --git a/src/lavinmq/shovel/shovel.cr b/src/lavinmq/shovel/shovel.cr index fb31b33ca9..feab2c3c1b 100644 --- a/src/lavinmq/shovel/shovel.cr +++ b/src/lavinmq/shovel/shovel.cr @@ -41,7 +41,8 @@ module LavinMQ def initialize(@name : String, @uri : URI, @queue : String?, @exchange : String? = nil, @exchange_key : String? = nil, - @delete_after = DEFAULT_DELETE_AFTER, @prefetch = DEFAULT_PREFETCH, @ack_mode = DEFAULT_ACK_MODE, + @delete_after = DEFAULT_DELETE_AFTER, @prefetch = DEFAULT_PREFETCH, + @ack_mode = DEFAULT_ACK_MODE, @consumer_args = {} of String => JSON::Any || nil, direct_user : User? = nil) @tag = "Shovel[#{@name}]" cfg = Config.instance @@ -60,6 +61,10 @@ module LavinMQ if @queue.nil? && @exchange.nil? raise ArgumentError.new("Shovel source requires a queue or an exchange") end + @args = AMQ::Protocol::Table.new() + @consumer_args.try &.each do |k, v| + @args[k] = v + end end def start @@ -122,6 +127,7 @@ module LavinMQ no_ack: @ack_mode.no_ack?, exclusive: true, block: true, + args: @args, tag: @tag) do |msg| blk.call(msg) @@ -154,7 +160,8 @@ module LavinMQ def initialize(@name : String, @uri : URI, @queue : String?, @exchange : String? = nil, @exchange_key : String? = nil, - @delete_after = DEFAULT_DELETE_AFTER, @prefetch = DEFAULT_PREFETCH, @ack_mode = DEFAULT_ACK_MODE, + @delete_after = DEFAULT_DELETE_AFTER, @prefetch = DEFAULT_PREFETCH, + @ack_mode = DEFAULT_ACK_MODE, @consumer_args = {} of String => String, direct_user : User? = nil) cfg = Config.instance @uri.host ||= "#{cfg.amqp_bind}:#{cfg.amqp_port}" @@ -176,6 +183,10 @@ module LavinMQ if @exchange.nil? raise ArgumentError.new("Shovel destination requires an exchange") end + @args = AMQ::Protocol::Table.new() + @consumer_args.try &.each do |k, v| + @args[k] = v + end end def start diff --git a/src/lavinmq/shovel/shovel_store.cr b/src/lavinmq/shovel/shovel_store.cr index d0487255c3..3b1b908230 100644 --- a/src/lavinmq/shovel/shovel_store.cr +++ b/src/lavinmq/shovel/shovel_store.cr @@ -23,6 +23,7 @@ module LavinMQ delete_after, prefetch, ack_mode, + config["src-consumer-args"]?.try &.as_h?, direct_user: @vhost.users.direct_user) dest = destination(name, config, ack_mode, delete_after, prefetch) shovel = Shovel::Runner.new(src, dest, name, @vhost, reconnect_delay)