From 45c2a2faf2ccd1ade156cc9abd74d3a2701952ff Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Mon, 25 Sep 2023 19:07:19 +0200 Subject: [PATCH] Bugfix corrupt dlx headers (#574) Updated amq-protocol.cr to v1.1.10 to fix issue. Also added spec. --- shard.lock | 2 +- spec/dlx_spec.cr | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 spec/dlx_spec.cr diff --git a/shard.lock b/shard.lock index 50d4a11195..5da04564cc 100644 --- a/shard.lock +++ b/shard.lock @@ -6,7 +6,7 @@ shards: amq-protocol: git: https://github.com/cloudamqp/amq-protocol.cr.git - version: 1.1.9 + version: 1.1.10 amqp-client: git: https://github.com/cloudamqp/amqp-client.cr.git diff --git a/spec/dlx_spec.cr b/spec/dlx_spec.cr new file mode 100644 index 0000000000..cf20b95f05 --- /dev/null +++ b/spec/dlx_spec.cr @@ -0,0 +1,32 @@ +require "./spec_helper" +require "./../src/lavinmq/queue" + +describe "Dead lettering" do + q_name = "ttl" + q_name_delayed = "ttl_delayed" + q_name_delayed_2 = "ttl_delayed_2" + + # Verifies bugfix for Sub-table memory corruption in amq-protocol.cr + # https://github.com/cloudamqp/amq-protocol.cr/pull/14 + it "should be able to read messages that has been dead lettered multiple times" do + with_channel do |ch| + q_delayed_2 = ch.queue(q_name_delayed_2, args: AMQP::Client::Arguments.new( + {"x-message-ttl" => 1, "x-dead-letter-exchange" => "", "x-dead-letter-routing-key" => q_name_delayed} + )) + q_delayed = ch.queue(q_name_delayed, args: AMQP::Client::Arguments.new( + {"x-message-ttl" => 1, "x-dead-letter-exchange" => "", "x-dead-letter-routing-key" => q_name} + )) + q = ch.queue(q_name) + + x = ch.default_exchange + x.publish_confirm("ttl", q_delayed_2.name) + msg = wait_for { q.get } + + x_death = msg.properties.headers.not_nil!["x-death"].as(Array(AMQ::Protocol::Field)) + x_death.inspect.should be_a(String) # checks that message and headers can be read + x_death.size.should eq 2 + x_death[0].as(AMQ::Protocol::Table)["queue"].should eq q_delayed.name + x_death[1].as(AMQ::Protocol::Table)["queue"].should eq q_delayed_2.name + end + end +end