Skip to content

Commit

Permalink
Fix formatting and apply change to MultihomeIOSessionRequester
Browse files Browse the repository at this point in the history
  • Loading branch information
arturobernalg committed Nov 15, 2024
1 parent 71bf32b commit f17a60e
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,9 @@ public void connect(
socket.connect(remoteAddress, TimeValue.isPositive(connectTimeout) ? connectTimeout.toMillisecondsIntBound() : 0);
conn.bind(socket);
onAfterSocketConnect(context, endpointHost);

if (LOG.isDebugEnabled()) {
LOG.debug("{} {} connected {}->{}", ConnPoolSupport.getId(conn), endpointHost, conn.getLocalAddress(), conn.getRemoteAddress());
}

conn.setSocketTimeout(soTimeout);
final TlsSocketStrategy tlsSocketStrategy = tlsSocketStrategyLookup != null ? tlsSocketStrategyLookup.lookup(endpointHost.getSchemeName()) : null;
if (tlsSocketStrategy != null) {
Expand All @@ -226,7 +224,6 @@ public void connect(
}
}
return;

} catch (final RuntimeException ex) {
Closer.closeQuietly(socket);
throw ex;
Expand All @@ -245,8 +242,6 @@ public void connect(
}
}



@Override
public void upgrade(
final ManagedHttpClientConnection conn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -108,19 +108,19 @@ public void cancelled() {
LOG.debug("{} resolving remote address", remoteEndpoint.getHostName());
}

final InetAddress[] remoteAddresses;
final List<SocketAddress> remoteAddresses;
try {
remoteAddresses = dnsResolver.resolve(remoteEndpoint.getHostName());
if (remoteAddresses == null || remoteAddresses.length == 0) {
throw new UnknownHostException(remoteEndpoint.getHostName());
remoteAddresses = dnsResolver.resolve(remoteEndpoint.getHostName(), remoteEndpoint.getPort());
if (remoteAddresses == null || remoteAddresses.isEmpty()) {
throw new UnknownHostException(remoteEndpoint.getHostName());
}
} catch (final UnknownHostException ex) {
future.failed(ex);
return future;
}

if (LOG.isDebugEnabled()) {
LOG.debug("{} resolved to {}", remoteEndpoint.getHostName(), Arrays.asList(remoteAddresses));
LOG.debug("{} resolved to {}", remoteEndpoint.getHostName(), remoteAddresses);
}

final Runnable runnable = new Runnable() {
Expand All @@ -129,7 +129,7 @@ public void cancelled() {

void executeNext() {
final int index = attempt.getAndIncrement();
final InetSocketAddress remoteAddress = new InetSocketAddress(remoteAddresses[index], remoteEndpoint.getPort());
final InetSocketAddress remoteAddress = (InetSocketAddress) remoteAddresses.get(index);

if (LOG.isDebugEnabled()) {
LOG.debug("{}:{} connecting {}->{} ({})",
Expand All @@ -155,13 +155,17 @@ public void completed(final IOSession session) {

@Override
public void failed(final Exception cause) {
if (attempt.get() >= remoteAddresses.length) {
if (attempt.get() >= remoteAddresses.size()) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}:{} connection to {} failed ({}); terminating operation",
remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass());
}
if (cause instanceof IOException) {
future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint, remoteAddresses));
final InetAddress[] addresses = remoteAddresses.stream()
.filter(addr -> addr instanceof InetSocketAddress)
.map(addr -> ((InetSocketAddress) addr).getAddress())
.toArray(InetAddress[]::new);
future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint, addresses));
} else {
future.failed(cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ void testConnect() throws Exception {
Mockito.when(detachedSocketFactory.create(Mockito.any())).thenReturn(socket);

final SocketConfig socketConfig = SocketConfig.custom()
.setSoKeepAlive(true)
.setSoReuseAddress(true)
.setSoTimeout(5000, TimeUnit.MILLISECONDS)
.setTcpNoDelay(true)
.setSoLinger(50, TimeUnit.MILLISECONDS)
.build();
.setSoKeepAlive(true)
.setSoReuseAddress(true)
.setSoTimeout(5000, TimeUnit.MILLISECONDS)
.setTcpNoDelay(true)
.setSoLinger(50, TimeUnit.MILLISECONDS)
.build();
final InetSocketAddress localAddress = new InetSocketAddress(local, 0);
connectionOperator.connect(conn, host, null, localAddress, Timeout.ofMilliseconds(123), socketConfig, null, context);

Expand All @@ -123,7 +123,6 @@ void testConnect() throws Exception {
Mockito.verify(conn, Mockito.times(2)).bind(socket);
}


@Test
void testConnectWithTLSUpgrade() throws Exception {
final HttpClientContext context = HttpClientContext.create();
Expand Down Expand Up @@ -234,6 +233,7 @@ void testConnectFailover() throws Exception {
Mockito.verify(socket, Mockito.times(2)).bind(localAddress);
Mockito.verify(socket).connect(ipAddress2, 123);
Mockito.verify(conn, Mockito.times(3)).bind(socket);

}

@Test
Expand All @@ -258,8 +258,6 @@ void testConnectExplicitAddress() throws Exception {
Mockito.verify(conn, Mockito.times(2)).bind(socket);
}



@Test
void testUpgrade() throws Exception {
final HttpClientContext context = HttpClientContext.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ void testTargetConnect() throws Exception {
Mockito.when(conn.isOpen()).thenReturn(false);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
Mockito.eq(null),
Mockito.any(),
Mockito.eq(null)))
Mockito.eq(route),
Mockito.eq(null),
Mockito.any(),
Mockito.eq(null)))
.thenReturn(future);

final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
Expand Down Expand Up @@ -308,10 +308,10 @@ void testProxyConnectAndUpgrade() throws Exception {
Mockito.when(conn.isOpen()).thenReturn(false);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(
Mockito.eq(route),
Mockito.eq(null),
Mockito.any(),
Mockito.eq(null)))
Mockito.eq(route),
Mockito.eq(null),
Mockito.any(),
Mockito.eq(null)))
.thenReturn(future);

final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/

package org.apache.hc.client5.http.impl.nio;

import static org.mockito.ArgumentMatchers.any;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.hc.client5.http.DnsResolver;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.reactor.ConnectionInitiator;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.Timeout;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class MultihomeIOSessionRequesterTest {

private DnsResolver dnsResolver;
private ConnectionInitiator connectionInitiator;
private MultihomeIOSessionRequester sessionRequester;
private NamedEndpoint namedEndpoint;

@BeforeEach
void setUp() {
dnsResolver = Mockito.mock(DnsResolver.class);
connectionInitiator = Mockito.mock(ConnectionInitiator.class);
namedEndpoint = Mockito.mock(NamedEndpoint.class);
sessionRequester = new MultihomeIOSessionRequester(dnsResolver);
}

@Test
void testConnectWithMultipleAddresses() throws Exception {
final InetAddress address1 = InetAddress.getByAddress(new byte[]{10, 0, 0, 1});
final InetAddress address2 = InetAddress.getByAddress(new byte[]{10, 0, 0, 2});
final List<SocketAddress> remoteAddresses = Arrays.asList(
new InetSocketAddress(address1, 8080),
new InetSocketAddress(address2, 8080)
);

Mockito.when(namedEndpoint.getHostName()).thenReturn("somehost");
Mockito.when(namedEndpoint.getPort()).thenReturn(8080);
Mockito.when(dnsResolver.resolve("somehost", 8080)).thenReturn(remoteAddresses);

Mockito.when(connectionInitiator.connect(any(), any(), any(), any(), any(), any()))
.thenAnswer(invocation -> {
final FutureCallback<IOSession> callback = invocation.getArgument(5);
callback.failed(new IOException("Simulated connection failure"));
return CompletableFuture.failedFuture(new IOException("Simulated connection failure"));
});

final Future<IOSession> future = sessionRequester.connect(
connectionInitiator,
namedEndpoint,
null,
Timeout.ofMilliseconds(500),
null,
null
);

Assertions.assertTrue(future.isDone());
try {
future.get();
Assertions.fail("Expected ExecutionException");
} catch (final ExecutionException ex) {
Assertions.assertTrue(ex.getCause() instanceof IOException);
Assertions.assertEquals("Simulated connection failure", ex.getCause().getMessage());
}
}

@Test
void testConnectSuccessfulAfterRetries() throws Exception {
final InetAddress address1 = InetAddress.getByAddress(new byte[]{10, 0, 0, 1});
final InetAddress address2 = InetAddress.getByAddress(new byte[]{10, 0, 0, 2});
final List<SocketAddress> remoteAddresses = Arrays.asList(
new InetSocketAddress(address1, 8080),
new InetSocketAddress(address2, 8080)
);

Mockito.when(namedEndpoint.getHostName()).thenReturn("somehost");
Mockito.when(namedEndpoint.getPort()).thenReturn(8080);
Mockito.when(dnsResolver.resolve("somehost", 8080)).thenReturn(remoteAddresses);

Mockito.when(connectionInitiator.connect(any(), any(), any(), any(), any(), any()))
.thenAnswer(invocation -> {
final FutureCallback<IOSession> callback = invocation.getArgument(5);
final InetSocketAddress remoteAddress = invocation.getArgument(1);
if (remoteAddress.getAddress().equals(address1)) {
// Fail the first address
callback.failed(new IOException("Simulated connection failure"));
return CompletableFuture.failedFuture(new IOException("Simulated connection failure"));
} else {
// Succeed for the second address
final IOSession mockSession = Mockito.mock(IOSession.class);
callback.completed(mockSession);
return CompletableFuture.completedFuture(mockSession);
}
});

final Future<IOSession> future = sessionRequester.connect(
connectionInitiator,
namedEndpoint,
null,
Timeout.ofMilliseconds(500),
null,
null
);

Assertions.assertTrue(future.isDone());
try {
final IOSession session = future.get();
Assertions.assertNotNull(session);
} catch (final ExecutionException ex) {
Assertions.fail("Did not expect an ExecutionException", ex);
}
}
}

0 comments on commit f17a60e

Please sign in to comment.