-
Notifications
You must be signed in to change notification settings - Fork 342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cancelling a consume call does not work as expected #506
Comments
yes that is strange, what is the consumer-tag that is reported by the application if you also install a onSuccess callback:
Also watch out that m.body() might not be null-terminated. |
Oh wow that was already a good question, turns out that was a bug on my part while attempting to solve the problem with automatically assigned tags, long story short, the DeferredConsume::onSuccess() and DeferredCancel::onSuccess() callbacks get the correct consumerTag passed. Unfortunately, the problem that consumption continues persists, but printing in the callbacks has shown something else: I'm cancelling a consumer before getting the DeferredConsume::onSuccess() callback. AMQP-CPP reports success for the cancellation of the (potentially not yet existing?) consumer. Could this be what's happening? And if yes, should DeferredCancel::onSuccess() be executed if a non-existing consumer has been cancelled? Is there a way for me to perform the consume and cancel calls blocking with AMQP-CPP? Nesting everything within the DeferredConsume::onSuccess() callback would not be ideal as the nesting is already pretty deep and the consume call is done elsewhere in the code. I might have found a(nother?) bug while trying to fix this: Rejecting a message doesn't seem to actually requeue it in the same place, another consumer will get subsequent messages, even when setting m_channel->setQos(1,true): m_channel->setQos(1,true);
m_queueCancelled = false;
m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([&](const AMQP::Message &message,
uint64_t deliveryTag, bool redelivered)
{if (m_queueCancelled){
std::cout << "queue already cancelled, rejecting: " << message.body() << std::endl;
m_channel->reject(deliveryTag, AMQP::requeue);
} else {
//process message
std::cout << "consuming normally: " << message.body() << std::endl;
m_channel->ack(deliveryTag);
}).onSuccess([](){std::cout << "consuming started" << std::endl;});
// everything below is nested in the cancel().onSuccess()
m_channel->cancel("hardcodedConsumerTag").onSuccess([&](std::string consumer)
{m_queueCancelled = true;
std::cout << "should have stopped consuming for: " << consumer << std::endl;
m_channel->get(queueName).onSuccess([&](const AMQP::Message &message, uint64_t deliveryTag,
bool redelivered) {
std::cout <<"printing in get: " << message.body() << std::endl;
m_channel->ack(deliveryTag);
callGetUntilEmpty(); //emits a QT signal that calls m_channel->get with the same arguments recursively until DeferredGet::onEmpty() is called
}).onEmpty([&](){std::cout << "queue empty" << std::endl);
m_queueCancelled = false;
m_channel->consume("hardcodedConsumerTag");});
}); and the output is
I need to preserve the message order, so having rejected messages not be in the same place is really inconvenient... I guess this might be RabbitMQ config, but I'm all ears if you happen to know how to achieve that :) note that when I neither acknowledge nor reject the message in the supposedley canceled consume call, m_channel->get() will still get message2 and message3, i thought this would not happen with setQos(1,true). |
Consuming a queue works, as does publishing. I don't use exchanges or anything, just one consumer on one queue. When attempting to cancel a queue, I'm still receiving messages after the DeferredCancel::onSuccess callback is executed. Also, the callback(std::string consumer) is empty, should this be the consumerTag again? I'm using the qt5 event loop and the most recent RabbitMQ release.
Here's what I'm observing:
output:
message 1
should have stopped consuming for: (here is just an empty string)
message 2
message 3
... until ALL messages have been delivered
I would have expected the messages to stop after the output "should have stopped consuming" is printed.
When attempting to start consuming under hardcodedConsumerTag again several seconds later, RabbitMQ returns "error - consumerTag already in use", which I interpret to mean the previous consumer is still registered. RabbitMQ also continues to deliver messages to the original consumer until I manually exit the application.
The text was updated successfully, but these errors were encountered: