From 13fa435e9fc8393421ab314744cb50e94dd2e5b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 9 Aug 2024 10:12:14 +0200 Subject: [PATCH] Delay consumer recovery after cancel Schedule the recovery after 2 seconds. This can avoid timing issue (queue-declare passive returns the queue exists but basic-consume fails because the queue does not exist). --- src/main/java/com/rabbitmq/perf/Consumer.java | 25 +++++++++++++++---- .../com/rabbitmq/perf/TopologyRecording.java | 12 +++------ 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/rabbitmq/perf/Consumer.java b/src/main/java/com/rabbitmq/perf/Consumer.java index bff8b9af..2635a9e0 100644 --- a/src/main/java/com/rabbitmq/perf/Consumer.java +++ b/src/main/java/com/rabbitmq/perf/Consumer.java @@ -364,14 +364,27 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig } @Override - public void handleCancel(String consumerTag) throws IOException { + public void handleCancel(String consumerTag) { System.out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag); epochMessageCount.set(0); if (consumerTagBranchMap.containsKey(consumerTag)) { String qName = consumerTagBranchMap.get(consumerTag); - TopologyRecording topologyRecording = topologyRecording(); - RecordedQueue queueRecord = topologyRecording.queue(qName); - consumeOrScheduleConsume(queueRecord, topologyRecording, consumerTag, qName); + Duration delay = Duration.ofSeconds(2); + LOGGER.debug("Scheduling consumer recovery after broker cancellation ({})", delay); + topologyRecoveryScheduledExecutorService.schedule( + () -> { + TopologyRecording topologyRecording = topologyRecording(); + RecordedQueue queueRecord = topologyRecording.queue(qName); + try { + consumeOrScheduleConsume(queueRecord, topologyRecording, consumerTag, qName); + } catch (IOException e) { + LOGGER.info( + "Error while recovering consumer after broker cancellation: {}", + e.getMessage()); + } + }, + delay.toMillis(), + TimeUnit.MILLISECONDS); } else { System.out.printf("Could not find queue for consumer tag: %s\n", consumerTag); } @@ -476,7 +489,9 @@ private void consumeOrScheduleConsume( resubscription, schedulingPeriod.getSeconds(), TimeUnit.SECONDS); } } else { - if (!queueExists) { + if (queueExists) { + LOGGER.debug("Queue {} does exist, subscribing", queueName); + } else { // the queue seems to have been deleted, re-creating it with its bindings LOGGER.debug( "Queue {} does not exist, trying to re-create it before re-subscribing", queueName); diff --git a/src/main/java/com/rabbitmq/perf/TopologyRecording.java b/src/main/java/com/rabbitmq/perf/TopologyRecording.java index e41c40cf..e9585acb 100644 --- a/src/main/java/com/rabbitmq/perf/TopologyRecording.java +++ b/src/main/java/com/rabbitmq/perf/TopologyRecording.java @@ -326,7 +326,7 @@ boolean isCluster() { return this.cluster; } - class RecordedExchange { + static class RecordedExchange { private final String name, type; @@ -341,14 +341,14 @@ public String toString() { } } - class RecordedQueue { + static class RecordedQueue { private final boolean durable; private final boolean exclusive; private final boolean autoDelete; private final Map arguments; private final boolean serverNamed; - private String name; + private volatile String name; private RecordedQueue( String name, @@ -373,10 +373,6 @@ public boolean isAutoDelete() { return autoDelete; } - public boolean isServerNamed() { - return serverNamed; - } - public boolean isExclusive() { return exclusive; } @@ -421,7 +417,7 @@ public String toString() { } } - class RecordedBinding { + static class RecordedBinding { private final String queue, exchange, routingKey;