Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --connection-allocation argument #777

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 36 additions & 9 deletions src/main/java/com/rabbitmq/perf/MulticastSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// [email protected].
package com.rabbitmq.perf;

import static com.rabbitmq.perf.PerfTest.CONNECTION_ALLOCATION.RANDOM;
import static com.rabbitmq.perf.PerfTest.CONNECTION_ALLOCATION.ROUND_ROBIN;
import static com.rabbitmq.perf.Utils.isRecoverable;
import static java.lang.Math.min;
import static java.lang.String.format;
Expand Down Expand Up @@ -48,8 +50,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -102,7 +106,8 @@ public MulticastSet(
completionHandler,
new ShutdownService(),
new ExpectedMetrics(params, new SimpleMeterRegistry(), "perftest_", Collections.emptyMap()),
InstanceSynchronization.NO_OP);
InstanceSynchronization.NO_OP,
RANDOM);
}

public MulticastSet(
Expand All @@ -114,7 +119,8 @@ public MulticastSet(
CompletionHandler completionHandler,
ShutdownService shutdownService,
ExpectedMetrics expectedMetrics,
InstanceSynchronization instanceSynchronization) {
InstanceSynchronization instanceSynchronization,
PerfTest.CONNECTION_ALLOCATION connectionAllocation) {
this.performanceMetrics = performanceMetrics;
this.factory = factory;
this.params = params;
Expand Down Expand Up @@ -158,7 +164,7 @@ public MulticastSet(
input -> Long.valueOf(input));
}

this.connectionCreator = new ConnectionCreator(this.factory, this.uris);
this.connectionCreator = new ConnectionCreator(this.factory, this.uris, connectionAllocation);
this.expectedMetrics = expectedMetrics;
this.instanceSynchronization = instanceSynchronization;
}
Expand Down Expand Up @@ -952,12 +958,17 @@ private static class ConnectionCreator {

private final ConnectionFactory cf;
private final List<Address> addresses;
private final UnaryOperator<List<Address>> connectionAllocation;

private ConnectionCreator(ConnectionFactory cf, List<String> uris) {
private ConnectionCreator(
ConnectionFactory cf,
List<String> uris,
PerfTest.CONNECTION_ALLOCATION connectionAllocation) {
this.cf = cf;
if (uris == null || uris.isEmpty()) {
// URI already set on the connection factory, nothing special to do
addresses = Collections.emptyList();
this.connectionAllocation = UnaryOperator.identity();
} else {
List<Address> addresses = new ArrayList<>(uris.size());
for (String uri : uris) {
Expand All @@ -968,6 +979,26 @@ private ConnectionCreator(ConnectionFactory cf, List<String> uris) {
}
}
this.addresses = Collections.unmodifiableList(addresses);
if (connectionAllocation == RANDOM) {
this.connectionAllocation =
l -> {
List<Address> addrs = new ArrayList<>(l);
if (addresses.size() > 1) {
Collections.shuffle(addrs);
}
return addrs;
};
} else if (connectionAllocation == ROUND_ROBIN) {
AtomicInteger allocationCount = new AtomicInteger(0);
this.connectionAllocation =
l -> {
Address addr = l.get(allocationCount.getAndIncrement() % l.size());
return Collections.singletonList(addr);
};
} else {
throw new IllegalArgumentException(
"Unknown connection allocation type: " + connectionAllocation.name());
}
}
}

Expand All @@ -984,11 +1015,7 @@ Connection createConnection(String name) throws IOException, TimeoutException {
if (this.addresses.isEmpty()) {
connection = this.cf.newConnection(name);
} else {
List<Address> addrs = new ArrayList<>(addresses);
if (addresses.size() > 1) {
Collections.shuffle(addrs);
}
connection = this.cf.newConnection(addrs, name);
connection = this.cf.newConnection(this.connectionAllocation.apply(this.addresses), name);
}
addBlockedListener(connection);
return connection;
Expand Down
42 changes: 41 additions & 1 deletion src/main/java/com/rabbitmq/perf/PerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,23 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) {

MulticastParams p = multicastParams(cmd, uris, perfTestOptions);

String connectionAllocationParam =
strArg(cmd, "cal", CONNECTION_ALLOCATION.RANDOM.allocation());
CONNECTION_ALLOCATION connectionAllocation =
Arrays.stream(CONNECTION_ALLOCATION.values())
.filter(a -> a.allocation().equals(connectionAllocationParam))
.findAny()
.orElse(null);

validate(
() -> connectionAllocation != null,
"--connection-allocation must one of "
+ Arrays.stream(CONNECTION_ALLOCATION.values())
.map(CONNECTION_ALLOCATION::allocation)
.collect(Collectors.joining(", ")),
systemExiter,
consoleErr);

ConcurrentMap<String, Integer> completionReasons = new ConcurrentHashMap<>();

MulticastSet.CompletionHandler completionHandler = getCompletionHandler(p, completionReasons);
Expand Down Expand Up @@ -344,7 +361,8 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) {
completionHandler,
shutdownService,
expectedMetrics,
instanceSynchronization);
instanceSynchronization,
connectionAllocation);
set.run(true);

statsSummary.run();
Expand Down Expand Up @@ -1364,6 +1382,13 @@ static Options getOptions() {

options.addOption(new Option("tnd", "tcp-no-delay", true, "value for TCP NODELAY option"));

options.addOption(
new Option(
"cal",
"connection-allocation",
true,
"the way to allocate connection across nodes (random or round-robin), default is random."));

return options;
}

Expand Down Expand Up @@ -1614,6 +1639,21 @@ enum EXIT_WHEN {
IDLE
}

enum CONNECTION_ALLOCATION {
RANDOM("random"),
ROUND_ROBIN("round-robin");

private final String allocation;

CONNECTION_ALLOCATION(String allocation) {
this.allocation = allocation;
}

public String allocation() {
return allocation;
}
}

private static ByteCapacity validateByteCapacity(
String value, SystemExiter exiter, PrintStream output) {
try {
Expand Down