Skip to content

Commit

Permalink
allow src-consumer-args on shovels to be able to shovel stream queues
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Oct 17, 2023
1 parent 9d5cf96 commit 878bcaf
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
15 changes: 13 additions & 2 deletions src/lavinmq/shovel/shovel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ module LavinMQ

def initialize(@name : String, @uri : URI, @queue : String?, @exchange : String? = nil,
@exchange_key : String? = nil,
@delete_after = DEFAULT_DELETE_AFTER, @prefetch = DEFAULT_PREFETCH, @ack_mode = DEFAULT_ACK_MODE,
@delete_after = DEFAULT_DELETE_AFTER, @prefetch = DEFAULT_PREFETCH,
@ack_mode = DEFAULT_ACK_MODE, @consumer_args = {} of String => JSON::Any || nil,
direct_user : User? = nil)
@tag = "Shovel[#{@name}]"
cfg = Config.instance
Expand All @@ -60,6 +61,10 @@ module LavinMQ
if @queue.nil? && @exchange.nil?
raise ArgumentError.new("Shovel source requires a queue or an exchange")
end
@args = AMQ::Protocol::Table.new()
@consumer_args.try &.each do |k, v|
@args[k] = v
end
end

def start
Expand Down Expand Up @@ -122,6 +127,7 @@ module LavinMQ
no_ack: @ack_mode.no_ack?,
exclusive: true,
block: true,
args: @args,
tag: @tag) do |msg|
blk.call(msg)

Expand Down Expand Up @@ -154,7 +160,8 @@ module LavinMQ

def initialize(@name : String, @uri : URI, @queue : String?, @exchange : String? = nil,
@exchange_key : String? = nil,
@delete_after = DEFAULT_DELETE_AFTER, @prefetch = DEFAULT_PREFETCH, @ack_mode = DEFAULT_ACK_MODE,
@delete_after = DEFAULT_DELETE_AFTER, @prefetch = DEFAULT_PREFETCH,
@ack_mode = DEFAULT_ACK_MODE, @consumer_args = {} of String => String,
direct_user : User? = nil)
cfg = Config.instance
@uri.host ||= "#{cfg.amqp_bind}:#{cfg.amqp_port}"
Expand All @@ -176,6 +183,10 @@ module LavinMQ
if @exchange.nil?
raise ArgumentError.new("Shovel destination requires an exchange")
end
@args = AMQ::Protocol::Table.new()
@consumer_args.try &.each do |k, v|
@args[k] = v
end
end

def start
Expand Down
1 change: 1 addition & 0 deletions src/lavinmq/shovel/shovel_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ module LavinMQ
delete_after,
prefetch,
ack_mode,
config["src-consumer-args"]?.try &.as_h?,
direct_user: @vhost.users.direct_user)
dest = destination(name, config, ack_mode, delete_after, prefetch)
shovel = Shovel::Runner.new(src, dest, name, @vhost, reconnect_delay)
Expand Down

0 comments on commit 878bcaf

Please sign in to comment.