Skip to content

Commit

Permalink
ARTEMIS-4794 configure pending ack behavior for bridge
Browse files Browse the repository at this point in the history
When a bridge is stopped it doesn't wait for pending send
acknowledgements to arrive. However, when a bridge is paused it does
wait. The behavior should be consistent and more importantly
configurable. This commit implements these improvements and generally
refactors BridgeImpl to clarify and simplify the code. In total, this
commit includes the follow changes:

 - Removes the hard-coded 60-second timeout for pending acks when
   pausing the bridge and adds a new config parameter (i.e.
   "pending-ack-timeout").
 - Applies the new pending-ack-timeout when the bridge is stopped.
 - Updates existing and adds new logging messages for clarity.
 - De-duplicates code for sending bridge-related notifications.
 - Avoids converting bridge name to/from SimpleString.
 - Removes unnecessary comments.
 - Renames variables & functions for clarity.
 - Replaces the `started`, `stopping`, & `active` booleans with a
   single `state` variable which is an enum.
 - Adds `final` to a few variables that were functionally final.
 - Synchronizes `stop` & `pause` methods to add safety when invoked
   concurrently with `handle` (since both deal with `state` and execute
   runnables on the ordered executor).
 - Reorganizes and removes a few methods for clarity.
 - Relocates `connect` method directly into `ConnectRunnable` (mirroring
   the structure of the `StopRunnable` and `PauseRunnable`).
 - Eliminates unnecessary variables in `ConnectRunnable` and
   `ScheduledConnectRunnable`.
 - Adds test to verify pending ack timeout works as expected with both
   `stop` & `pause` with both regular and large messages.
  • Loading branch information
jbertram authored and gemmellr committed Jul 15, 2024
1 parent 1361e29 commit 19d8059
Show file tree
Hide file tree
Showing 16 changed files with 555 additions and 308 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ public static String getDefaultHapolicyBackupStrategy() {
// Number of concurrent workers for a core bridge
public static int DEFAULT_BRIDGE_CONCURRENCY = 1;

// How long to wait for acknowledgements to arrive from the bridge's target while stopping or pausing the bridge
public static long DEFAULT_BRIDGE_PENDING_ACK_TIMEOUT = 60000;

// Whether or not to report Netty pool metrics
private static final boolean DEFAULT_NETTY_POOL_METRICS = false;

Expand Down Expand Up @@ -1860,6 +1863,10 @@ public static int getDefaultBridgeConcurrency() {
return DEFAULT_BRIDGE_CONCURRENCY;
}

public static long getDefaultBridgePendingAckTimeout() {
return DEFAULT_BRIDGE_PENDING_ACK_TIMEOUT;
}

/**
* Whether or not to report Netty pool metrics
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public final class BridgeConfiguration implements Serializable {
public static String ROUTING_TYPE = "routing-type";
public static String CONCURRENCY = "concurrency";
public static String CONFIGURATION_MANAGED = "configuration-managed";
public static String PENDING_ACK_TIMEOUT = "pending-ack-timeout";

private String name = null;

Expand Down Expand Up @@ -120,6 +121,8 @@ public final class BridgeConfiguration implements Serializable {

private int concurrency = ActiveMQDefaultConfiguration.getDefaultBridgeConcurrency();

private long pendingAckTimeout = ActiveMQDefaultConfiguration.getDefaultBridgePendingAckTimeout();

private String parentName = null;

private boolean configurationManaged = true;
Expand Down Expand Up @@ -155,6 +158,7 @@ public BridgeConfiguration(BridgeConfiguration other) {
routingType = other.routingType;
concurrency = other.concurrency;
configurationManaged = other.configurationManaged;
pendingAckTimeout = other.pendingAckTimeout;
}

public BridgeConfiguration(String name) {
Expand Down Expand Up @@ -261,6 +265,8 @@ public BridgeConfiguration set(String key, String value) {
setRoutingType(ComponentConfigurationRoutingType.valueOf(value));
} else if (key.equals(CONCURRENCY)) {
setConcurrency(Integer.parseInt(value));
} else if (key.equals(PENDING_ACK_TIMEOUT)) {
setPendingAckTimeout(Long.parseLong(value));
}
}
return this;
Expand Down Expand Up @@ -570,6 +576,21 @@ public BridgeConfiguration setConcurrency(int concurrency) {
return this;
}

/**
* @return the bridge pending ack timeout
*/
public long getPendingAckTimeout() {
return pendingAckTimeout;
}

/**
* @param pendingAckTimeout the bridge pending ack timeout to set
*/
public BridgeConfiguration setPendingAckTimeout(long pendingAckTimeout) {
this.pendingAckTimeout = pendingAckTimeout;
return this;
}

/**
* At this point this is only changed on testcases
* The bridge shouldn't be sending blocking anyways
Expand Down Expand Up @@ -631,6 +652,7 @@ public String toJSON() {
builder.add(CALL_TIMEOUT, getCallTimeout());
builder.add(CONCURRENCY, getConcurrency());
builder.add(CONFIGURATION_MANAGED, isConfigurationManaged());
builder.add(PENDING_ACK_TIMEOUT, getPendingAckTimeout());

// complex fields (only serialize if value is not null)

Expand Down Expand Up @@ -725,6 +747,7 @@ public int hashCode() {
result = prime * result + (useDuplicateDetection ? 1231 : 1237);
result = prime * result + ((user == null) ? 0 : user.hashCode());
result = prime * result + concurrency;
result = prime * result + (int) (pendingAckTimeout ^ (pendingAckTimeout >>> 32));
result = prime * result + (configurationManaged ? 1231 : 1237);
return result;
}
Expand Down Expand Up @@ -811,6 +834,8 @@ public boolean equals(Object obj) {
return false;
if (concurrency != other.concurrency)
return false;
if (pendingAckTimeout != other.pendingAckTimeout)
return false;
if (configurationManaged != other.configurationManaged)
return false;
return true;
Expand Down Expand Up @@ -857,6 +882,7 @@ public int getEncodeSize() {
BufferHelper.sizeOfNullableInteger(minLargeMessageSize) +
BufferHelper.sizeOfNullableLong(callTimeout) +
BufferHelper.sizeOfNullableInteger(concurrency) +
BufferHelper.sizeOfNullableLong(pendingAckTimeout) +
BufferHelper.sizeOfNullableBoolean(configurationManaged) +
DataConstants.SIZE_BYTE +
transformerSize +
Expand Down Expand Up @@ -909,6 +935,7 @@ public void encode(ActiveMQBuffer buffer) {
} else {
buffer.writeInt(0);
}
buffer.writeNullableLong(pendingAckTimeout);
}

public void decode(ActiveMQBuffer buffer) {
Expand Down Expand Up @@ -952,6 +979,9 @@ public void decode(ActiveMQBuffer buffer) {
staticConnectors.add(buffer.readNullableString());
}
}
if (buffer.readable()) {
pendingAckTimeout = buffer.readNullableLong();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2475,6 +2475,8 @@ private void parseBridgeConfiguration(final Element brNode, final Configuration

int concurrency = getInteger(brNode, "concurrency", ActiveMQDefaultConfiguration.getDefaultBridgeConcurrency(), GT_ZERO);

long pendingAckTimeout = getLong(brNode, "pending-ack-timeout", ActiveMQDefaultConfiguration.getDefaultBridgePendingAckTimeout(), GT_ZERO);

NodeList clusterPassNodes = brNode.getElementsByTagName("password");
String password = null;

Expand Down Expand Up @@ -2541,7 +2543,8 @@ private void parseBridgeConfiguration(final Element brNode, final Configuration
.setUser(user)
.setPassword(password)
.setRoutingType(routingType)
.setConcurrency(concurrency);
.setConcurrency(concurrency)
.setPendingAckTimeout(pendingAckTimeout);

if (!staticConnectorNames.isEmpty()) {
config.setStaticConnectors(staticConnectorNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.logs.annotation.LogBundle;
import org.apache.activemq.artemis.logs.annotation.Message;
import org.apache.activemq.artemis.logs.BundleFactory;
Expand Down Expand Up @@ -555,4 +556,7 @@ IllegalStateException invalidRoutingTypeUpdate(String queueName,
@Message(id = 229254, value = "Already replicating, started={}")
ActiveMQIllegalStateException alreadyReplicating(boolean status);

@Message(id = 229255, value = "Bridge {} cannot be {}. Current state: {}")
ActiveMQIllegalStateException bridgeOperationCannotBeExecuted(String bridgeName, String failedOp, BridgeImpl.State currentState);

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCallback;
Expand Down Expand Up @@ -152,13 +153,13 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 221027, value = "Bridge {} is connected", level = LogMessage.Level.INFO)
void bridgeConnected(BridgeImpl name);

@LogMessage(id = 221028, value = "Bridge is stopping, will not retry", level = LogMessage.Level.INFO)
void bridgeStopping();
@LogMessage(id = 221028, value = "Bridge is {}, will not retry", level = LogMessage.Level.INFO)
void bridgeWillNotRetry(String operation);

@LogMessage(id = 221029, value = "stopped bridge {}", level = LogMessage.Level.INFO)
@LogMessage(id = 221029, value = "Stopped bridge {}", level = LogMessage.Level.INFO)
void bridgeStopped(String name);

@LogMessage(id = 221030, value = "paused bridge {}", level = LogMessage.Level.INFO)
@LogMessage(id = 221030, value = "Paused bridge {}", level = LogMessage.Level.INFO)
void bridgePaused(String name);

@LogMessage(id = 221031, value = "backup announced", level = LogMessage.Level.INFO)
Expand Down Expand Up @@ -197,8 +198,8 @@ void messageWithDuplicateID(Object duplicateProperty,
@LogMessage(id = 221041, value = "Cannot find queue {} while reloading PAGE_CURSOR_COMPLETE, deleting record now", level = LogMessage.Level.INFO)
void cantFindQueueOnPageComplete(long queueID);

@LogMessage(id = 221042, value = "Bridge {} timed out waiting for the completion of {} messages, we will just shutdown the bridge after 10 seconds wait", level = LogMessage.Level.INFO)
void timedOutWaitingCompletions(String bridgeName, long numberOfMessages);
@LogMessage(id = 221042, value = "{} bridge {} timed out waiting for the send acknowledgement of {} messages. Messages may be duplicated between the bridge's source and the target.", level = LogMessage.Level.INFO)
void timedOutWaitingForSendAcks(String operation, String bridgeName, long numberOfMessages);

@LogMessage(id = 221043, value = "Protocol module found: [{}]. Adding protocol support for: {}", level = LogMessage.Level.INFO)
void addingProtocolSupport(String moduleName, String protocolKey);
Expand Down Expand Up @@ -789,8 +790,8 @@ void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
@LogMessage(id = 222159, value = "unable to send notification when broadcast group is stopped", level = LogMessage.Level.WARN)
void broadcastBridgeStoppedError(Exception e);

@LogMessage(id = 222160, value = "unable to send notification when broadcast group is stopped", level = LogMessage.Level.WARN)
void notificationBridgeStoppedError(Exception e);
@LogMessage(id = 222160, value = "unable to send notification for bridge {}: {}", level = LogMessage.Level.WARN)
void notificationBridgeError(String bridge, CoreNotificationType type, Exception e);

@LogMessage(id = 222161, value = "Group Handler timed-out waiting for sendCondition", level = LogMessage.Level.WARN)
void groupHandlerSendTimeout();
Expand Down Expand Up @@ -1302,8 +1303,8 @@ void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
@LogMessage(id = 224030, value = "Could not cancel reference {}", level = LogMessage.Level.ERROR)
void errorCancellingRefOnBridge(MessageReference ref2, Exception e);

@LogMessage(id = 224032, value = "Failed to pause bridge", level = LogMessage.Level.ERROR)
void errorPausingBridge(Exception e);
@LogMessage(id = 224032, value = "Failed to pause bridge: {}", level = LogMessage.Level.ERROR)
void errorPausingBridge(String bridgeName, Exception e);

@LogMessage(id = 224033, value = "Failed to broadcast connector configs", level = LogMessage.Level.ERROR)
void errorBroadcastingConnectorConfigs(Exception e);
Expand Down Expand Up @@ -1617,4 +1618,7 @@ void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,

@LogMessage(id = 224138, value = "Error Registering DuplicateCacheSize on namespace {}", level = LogMessage.Level.WARN)
void errorRegisteringDuplicateCacheSize(String address, Exception reason);

@LogMessage(id = 224139, value = "Failed to stop bridge: {}", level = LogMessage.Level.ERROR)
void errorStoppingBridge(String bridgeName, Exception e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public void stop() throws Exception {

for (Bridge bridge : bridges.values()) {
bridge.stop();
managementService.unregisterBridge(bridge.getName().toString());
managementService.unregisterBridge(bridge.getConfiguration().getName());
}

bridges.clear();
Expand Down Expand Up @@ -532,17 +532,17 @@ public void destroyBridge(final String name) throws Exception {

synchronized (this) {
for (Bridge bridge : bridges.values()) {
if (bridge.getName().toString().matches(name + "|" + name + "-\\d+")) {
bridge = bridges.get(bridge.getName().toString());
if (bridge.getConfiguration().getName().matches(name + "|" + name + "-\\d+")) {
bridge = bridges.get(bridge.getConfiguration().getName());
if (bridge != null) {
bridgesToRemove.add(bridge);
}
}
}
for (Bridge bridgeToRemove : bridgesToRemove) {
bridges.remove(bridgeToRemove.getName().toString());
bridges.remove(bridgeToRemove.getConfiguration().getName());
bridgeToRemove.stop();
managementService.unregisterBridge(bridgeToRemove.getName().toString());
managementService.unregisterBridge(bridgeToRemove.getConfiguration().getName());
}
}
for (Bridge bridge : bridgesToRemove) {
Expand Down
Loading

0 comments on commit 19d8059

Please sign in to comment.