Skip to content

Commit

Permalink
Delay consumer recovery after cancel
Browse files Browse the repository at this point in the history
Schedule the recovery after 2 seconds. This can avoid timing issue
(queue-declare passive returns the queue exists but basic-consume fails
because the queue does not exist).
  • Loading branch information
acogoluegnes committed Aug 9, 2024
1 parent 153aa8f commit 13fa435
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
25 changes: 20 additions & 5 deletions src/main/java/com/rabbitmq/perf/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,27 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
}

@Override
public void handleCancel(String consumerTag) throws IOException {
public void handleCancel(String consumerTag) {
System.out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag);
epochMessageCount.set(0);
if (consumerTagBranchMap.containsKey(consumerTag)) {
String qName = consumerTagBranchMap.get(consumerTag);
TopologyRecording topologyRecording = topologyRecording();
RecordedQueue queueRecord = topologyRecording.queue(qName);
consumeOrScheduleConsume(queueRecord, topologyRecording, consumerTag, qName);
Duration delay = Duration.ofSeconds(2);
LOGGER.debug("Scheduling consumer recovery after broker cancellation ({})", delay);
topologyRecoveryScheduledExecutorService.schedule(
() -> {
TopologyRecording topologyRecording = topologyRecording();
RecordedQueue queueRecord = topologyRecording.queue(qName);
try {
consumeOrScheduleConsume(queueRecord, topologyRecording, consumerTag, qName);
} catch (IOException e) {
LOGGER.info(
"Error while recovering consumer after broker cancellation: {}",
e.getMessage());
}
},
delay.toMillis(),
TimeUnit.MILLISECONDS);
} else {
System.out.printf("Could not find queue for consumer tag: %s\n", consumerTag);
}
Expand Down Expand Up @@ -476,7 +489,9 @@ private void consumeOrScheduleConsume(
resubscription, schedulingPeriod.getSeconds(), TimeUnit.SECONDS);
}
} else {
if (!queueExists) {
if (queueExists) {
LOGGER.debug("Queue {} does exist, subscribing", queueName);
} else {
// the queue seems to have been deleted, re-creating it with its bindings
LOGGER.debug(
"Queue {} does not exist, trying to re-create it before re-subscribing", queueName);
Expand Down
12 changes: 4 additions & 8 deletions src/main/java/com/rabbitmq/perf/TopologyRecording.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ boolean isCluster() {
return this.cluster;
}

class RecordedExchange {
static class RecordedExchange {

private final String name, type;

Expand All @@ -341,14 +341,14 @@ public String toString() {
}
}

class RecordedQueue {
static class RecordedQueue {

private final boolean durable;
private final boolean exclusive;
private final boolean autoDelete;
private final Map<String, Object> arguments;
private final boolean serverNamed;
private String name;
private volatile String name;

private RecordedQueue(
String name,
Expand All @@ -373,10 +373,6 @@ public boolean isAutoDelete() {
return autoDelete;
}

public boolean isServerNamed() {
return serverNamed;
}

public boolean isExclusive() {
return exclusive;
}
Expand Down Expand Up @@ -421,7 +417,7 @@ public String toString() {
}
}

class RecordedBinding {
static class RecordedBinding {

private final String queue, exchange, routingKey;

Expand Down

0 comments on commit 13fa435

Please sign in to comment.