Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky ServerStatusManagerIntegrationTest #10

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
import com.linecorp.centraldogma.server.internal.api.UpdateServerStatusRequest.Scope;
import com.linecorp.centraldogma.testing.internal.CentralDogmaReplicationExtension;
import com.linecorp.centraldogma.testing.internal.CentralDogmaRuleDelegate;
import com.linecorp.centraldogma.testing.internal.FlakyTest;

@FlakyTest
class ServerStatusManagerIntegrationTest {
@RegisterExtension
final CentralDogmaReplicationExtension cluster = new CentralDogmaReplicationExtension(3) {
Expand Down Expand Up @@ -87,8 +85,6 @@ void preserveStatusAfterRestarting() throws Exception {
assertThatThrownBy(() -> getServerStatus(client))
.isInstanceOf(UnprocessedRequestException.class)
.hasCauseInstanceOf(ConnectException.class);
// Wait for the ports acquired to be released.
Thread.sleep(5000);

// Restart the cluster with the same configuration.
cluster.start();
Expand All @@ -113,8 +109,6 @@ void preserveStatusAfterRestarting() throws Exception {
assertThatThrownBy(() -> getServerStatus(client))
.isInstanceOf(UnprocessedRequestException.class)
.hasCauseInstanceOf(ConnectException.class);
// Wait for the ports acquired to be released.
Thread.sleep(5000);

cluster.start();
serverStatus = getServerStatus(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -41,6 +44,7 @@
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.server.ServerPort;
import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.internal.api.v1.AccessToken;
import com.linecorp.centraldogma.server.CentralDogma;
Expand All @@ -60,7 +64,7 @@
*/
public class CentralDogmaReplicationExtension extends AbstractAllOrEachExtension {

private static final int MAX_RETRIES = 10;
private static final int MAX_RETRIES = 16;

private final TemporaryFolder tmpDir = new TemporaryFolder();
private final AuthProviderFactory factory = new TestAuthProviderFactory();
Expand Down Expand Up @@ -186,19 +190,66 @@ protected void after(ExtensionContext context) throws Exception {
} catch (IOException e) {
throw new CompletionException(e);
}
});
}).join();
}
}

public void start() {
public void start() throws InterruptedException {
if (dogmaCluster == null) {
throw new IllegalStateException("Central Dogma cluster is not created yet");
}

final List<Integer> ports = new ArrayList<>(12);
final ZooKeeperReplicationConfig zkConfig =
(ZooKeeperReplicationConfig) dogmaCluster.get(0).dogma().config().replicationConfig();
for (ZooKeeperServerConfig config : zkConfig.servers().values()) {
ports.add(config.clientPort());
ports.add(config.electionPort());
ports.add(config.quorumPort());
}
for (CentralDogmaRuleDelegate delegate : dogmaCluster) {
for (ServerPort port : delegate.dogma().config().ports()) {
ports.add(port.localAddress().getPort());
}
}

// This logic won't completely prevent port duplication, but it is best efforts to reduce flakiness.
boolean success = true;
for (int i = 0; i < MAX_RETRIES * 2; i++) {
success = true;
for (Integer port : ports) {
if (!isTcpPortAvailable(port)) {
success = false;
break;
}
}
if (success) {
break;
} else if (i < MAX_RETRIES - 1) {
Thread.sleep(1500);
}
}
if (!success) {
throw new IllegalStateException("Failed to find available ports for the Central Dogma cluster. " +
"candidates: " + ports);
}

dogmaCluster.stream().map(CentralDogmaRuleDelegate::dogma)
.map(CentralDogma::start)
.collect(Collectors.toList())
.forEach(CompletableFuture::join);
// Wait for the Central Dogma cluster to be ready.
Thread.sleep(500);
}

private static boolean isTcpPortAvailable(int port) {
try (ServerSocket ignored = new ServerSocket(port, 1,
InetAddress.getByName("127.0.0.1"))) {
return true;
} catch (IOException e) {
// Port in use or unable to bind.
return false;
}
}

public void stop() {
Expand Down
Loading