Skip to content

Commit

Permalink
The address resolver is missing the notion of endpoint which is an in…
Browse files Browse the repository at this point in the history
…termediary between the resolved address and the socket address the client eventually uses. This forces address resolver implementation to determine which internal endpoint was selected based on the socket address the resolver choose, specially when the client report usage statistics to the resolver.

Introduce endpoint in the address resolver, the resolver now does select an endpoint for which a socket address can be determined, metrics can now use the endpoint type to properly report usage statistics.
  • Loading branch information
vietj committed Sep 29, 2023
1 parent 1e96e06 commit 5a17ab3
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ public HttpClient build() {
if (co.isShared()) {
CloseFuture closeFuture = new CloseFuture();
client = vertx.createSharedResource("__vertx.shared.httpClients", co.getName(), closeFuture, cf_ -> {
io.vertx.core.spi.net.AddressResolver<?, ?, ?> resolver = addressResolver != null ? addressResolver.resolver(vertx) : null;
io.vertx.core.spi.net.AddressResolver<?, ?, ?, ?> resolver = addressResolver != null ? addressResolver.resolver(vertx) : null;
HttpClientImpl impl = new HttpClientImpl(vertx, resolver, co, po);
cf_.add(completion -> impl.close().onComplete(completion));
return impl;
});
client = new CleanableHttpClient((HttpClientInternal) client, vertx.cleaner(), (timeout, timeunit) -> closeFuture.close());
closeable = closeFuture;
} else {
io.vertx.core.spi.net.AddressResolver<?, ?, ?> resolver = addressResolver != null ? addressResolver.resolver(vertx) : null;
io.vertx.core.spi.net.AddressResolver<?, ?, ?, ?> resolver = addressResolver != null ? addressResolver.resolver(vertx) : null;
HttpClientImpl impl = new HttpClientImpl(vertx, resolver, co, po);
closeable = impl;
client = new CleanableHttpClient(impl, vertx.cleaner(), impl::shutdown);
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ public class HttpClientImpl extends HttpClientBase implements HttpClientInternal

private final PoolOptions poolOptions;
private final ConnectionManager<EndpointKey, Lease<HttpClientConnection>> httpCM;
private final EndpointResolver<?, EndpointKey, Lease<HttpClientConnection>, ?> endpointResolver;
private final EndpointResolver<?, EndpointKey, Lease<HttpClientConnection>, ?, ?> endpointResolver;
private volatile Function<HttpClientResponse, Future<RequestOptions>> redirectHandler = DEFAULT_HANDLER;
private long timerID;
private volatile Handler<HttpConnection> connectionHandler;
private final Function<ContextInternal, ContextInternal> contextProvider;

public HttpClientImpl(VertxInternal vertx, AddressResolver<?, ?, ?> addressResolver, HttpClientOptions options, PoolOptions poolOptions) {
public HttpClientImpl(VertxInternal vertx, AddressResolver<?, ?, ?, ?> addressResolver, HttpClientOptions options, PoolOptions poolOptions) {
super(vertx, options);
if (addressResolver != null) {
this.endpointResolver = new EndpointResolver<>(addressResolver);
Expand Down Expand Up @@ -335,7 +335,7 @@ private Future<HttpClientRequest> doRequest(
timeout,
ctx,
connCtx,
(EndpointResolver.ResolvedEndpoint) endpoint));
(EndpointResolver.AddressEndpoint) endpoint));
if (future != null) {
proxyOptions = proxyConfig;
} else {
Expand Down Expand Up @@ -396,22 +396,22 @@ private HttpClientRequest createRequest(
/**
* Create a decorated {@link HttpClientStream} that will gather stream statistics reported to the {@link AddressResolver}
*/
private static <S, A extends Address> Optional<Future<HttpClientStream>> createDecoratedHttpClientStream(
private static <S, A extends Address, E> Optional<Future<HttpClientStream>> createDecoratedHttpClientStream(
long timeout,
ContextInternal ctx,
ContextInternal connCtx,
EndpointResolver.ResolvedEndpoint resolvedEndpoint) {
Future<Lease<HttpClientConnection>> f = resolvedEndpoint.getConnection(connCtx, timeout);
EndpointResolver.AddressEndpoint addressEndpoint) {
Future<ConnectionLookup<Lease<HttpClientConnection>, E, ?>> f = addressEndpoint.getConnection(connCtx, timeout);
if (f == null) {
return Optional.empty();
} else {
return Optional.of(f.compose(lease -> {
return Optional.of(f.compose(res -> {
Lease<HttpClientConnection> lease = res.connection();
HttpClientConnection conn = lease.get();
return conn
.createStream(ctx)
.map(stream -> {
AddressResolver<S, A, ?> resolver = resolvedEndpoint.addressResolver();
HttpClientStream wrapped = new StatisticsGatheringHttpClientStream<>(stream, (AddressResolver)resolver, resolvedEndpoint.state(), conn.remoteAddress());
HttpClientStream wrapped = new StatisticsGatheringHttpClientStream<>(stream, res);
wrapped.closeHandler(v -> lease.recycle());
return wrapped;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,22 @@
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.net.AddressResolver;
import io.vertx.core.net.impl.pool.ConnectionLookup;
import io.vertx.core.streams.WriteStream;

/**
* Decorates an {@link HttpClientStream} that gathers usage statistics.
*
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
class StatisticsGatheringHttpClientStream<S, M> implements HttpClientStream {
class StatisticsGatheringHttpClientStream<M, E> implements HttpClientStream {

private final HttpClientStream delegate;
private final AddressResolver<S, ?, M> resolver;
private final S state;
private final SocketAddress server;
private M metric;
private final ConnectionLookup<?, E, M> lookup;

StatisticsGatheringHttpClientStream(HttpClientStream delegate, AddressResolver<S, ?, M> resolver, S state, SocketAddress server) {
StatisticsGatheringHttpClientStream(HttpClientStream delegate, ConnectionLookup<?, E, M> lookup) {
this.delegate = delegate;
this.resolver = resolver;
this.state = state;
this.server = server;
this.lookup = lookup;
}

@Override
Expand Down Expand Up @@ -77,17 +71,17 @@ public ContextInternal getContext() {

@Override
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect) {
metric = resolver.requestBegin(state, server);
lookup.beginRequest();
if (end) {
resolver.requestEnd(metric);
lookup.endRequest();
}
return delegate.writeHead(request, chunked, buf, end, priority, connect);
}

@Override
public Future<Void> writeBuffer(ByteBuf buf, boolean end) {
if (end) {
resolver.requestEnd(metric);
lookup.endRequest();
}
return delegate.writeBuffer(buf, end);
}
Expand Down Expand Up @@ -121,7 +115,7 @@ public void unknownFrameHandler(Handler<HttpFrame> handler) {
public void headHandler(Handler<HttpResponseHead> handler) {
if (handler != null) {
delegate.headHandler(multimap -> {
resolver.responseBegin(metric);
lookup.beginResponse();
handler.handle(multimap);
});
} else {
Expand All @@ -138,7 +132,7 @@ public void chunkHandler(Handler<Buffer> handler) {
public void endHandler(Handler<MultiMap> handler) {
if (handler != null) {
delegate.endHandler(multimap -> {
resolver.responseEnd(metric);
lookup.endResponse();
handler.handle(multimap);
});
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/net/AddressResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ public interface AddressResolver {
* @param vertx the vertx instance
* @return the resolver
*/
io.vertx.core.spi.net.AddressResolver<?, ?, ?> resolver(Vertx vertx);
io.vertx.core.spi.net.AddressResolver<?, ?, ?, ?> resolver(Vertx vertx);

}
50 changes: 50 additions & 0 deletions src/main/java/io/vertx/core/net/impl/pool/ConnectionLookup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.net.impl.pool;

import io.vertx.core.spi.net.AddressResolver;

/**
* A connection lookup to the endpoint resolver, provides additional information.
*/
public class ConnectionLookup<C, E, M> {

private final AddressResolver<?, ?, M, E> resolver;
private final C connection;
private final E endpoint;
private M metric;

public ConnectionLookup(C connection, AddressResolver<?, ?, M, E> resolver, E endpoint) {
this.connection = connection;
this.endpoint = endpoint;
this.resolver = resolver;
}

public C connection() {
return connection;
}

public void beginRequest() {
metric = resolver.requestBegin(endpoint);
}

public void endRequest() {
resolver.requestEnd(metric);
}

public void beginResponse() {
resolver.responseBegin(metric);
}

public void endResponse() {
resolver.responseEnd(metric);
}
}
70 changes: 18 additions & 52 deletions src/main/java/io/vertx/core/net/impl/pool/EndpointResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
* @param <C> the connection type
* @param <A> the resolved address type
*/
public class EndpointResolver<S, K, C, A extends Address> {
public class EndpointResolver<S, K, C, A extends Address, E> {

private final AddressResolver<S, A, ?> addressResolver;
private final ConnectionManager<A, C> connectionManager;
private final AddressResolver<S, A, ?, E> addressResolver;
private final ConnectionManager<A, ConnectionLookup<C, E, ?>> connectionManager;

public EndpointResolver(AddressResolver<S, A, ?> addressResolver) {
public EndpointResolver(AddressResolver<S, A, ?, E> addressResolver) {
this.addressResolver = addressResolver;
this.connectionManager = new ConnectionManager<>();
}
Expand All @@ -56,12 +56,12 @@ public <T, P> T withEndpoint(Address address,
P payload,
EndpointProvider<K, C> endpointProvider,
BiFunction<P, SocketAddress, K> zfn,
Function<Endpoint<C>, Optional<T>> fn) {
Function<Endpoint<ConnectionLookup<C, E, ?>>, Optional<T>> fn) {
A resolverAddress = addressResolver.tryCast(address);
if (resolverAddress == null) {
return null;
} else {
EndpointProvider<A, C> provider = (key, disposer) -> new ResolvedEndpoint(
EndpointProvider<A, ConnectionLookup<C, E, ?>> provider = (key, disposer) -> new AddressEndpoint<>(
addressResolver.resolve(key),
disposer,
key,
Expand All @@ -81,16 +81,16 @@ public <T, P> T withEndpoint(Address address,
// Trick to pass payload (as we have synchronous calls)
final ThreadLocal<Object> thread_local = new ThreadLocal<>();

public class ResolvedEndpoint<P> extends Endpoint<C> {
public class AddressEndpoint<P> extends Endpoint<ConnectionLookup<C, E, ?>> {

private final AtomicReference<S> state;
private final AtomicReference<Future<S>> stateRef;
private final ConnectionManager<EndpointKeyWrapper, C> connectionManager;
private final ConnectionManager<K, C> connectionManager;
private final A address;
private final EndpointProvider<K, C> endpointProvider;
private final BiFunction<P, SocketAddress, K> zfn;

public ResolvedEndpoint(Future<S> stateRef, Runnable disposer, A address, EndpointProvider<K, C> endpointProvider, BiFunction<P, SocketAddress, K> zfn) {
public AddressEndpoint(Future<S> stateRef, Runnable disposer, A address, EndpointProvider<K, C> endpointProvider, BiFunction<P, SocketAddress, K> zfn) {
super(() -> {
if (stateRef.result() != null) {
addressResolver.dispose(stateRef.result());
Expand All @@ -115,17 +115,13 @@ public S state() {
return state.get();
}

public AddressResolver<S, A, ?> addressResolver() {
return addressResolver;
}

@Override
public void checkExpired() {
connectionManager.checkExpired();
}

@Override
public Future<C> requestConnection(ContextInternal ctx, long timeout) {
public Future<ConnectionLookup<C, E, ?>> requestConnection(ContextInternal ctx, long timeout) {
P payload = (P) thread_local.get();
Future<S> fut = stateRef.get();
return fut.transform(ar -> {
Expand All @@ -138,7 +134,7 @@ public Future<C> requestConnection(ContextInternal ctx, long timeout) {
if (stateRef.compareAndSet(fut, promise.future())) {
addressResolver.resolve(address).andThen(ar2 -> {
if (ar2.succeeded()) {
ResolvedEndpoint.this.state.set(ar2.result());
AddressEndpoint.this.state.set(ar2.result());
}
}).onComplete(promise);
return promise.future();
Expand All @@ -150,54 +146,24 @@ public Future<C> requestConnection(ContextInternal ctx, long timeout) {
return (Future<S>) ar;
}
}).compose(state -> addressResolver
.pickAddress(state)
.compose(origin -> {
.pickEndpoint(state)
.compose(endpoint -> {
incRefCount();
SocketAddress origin = addressResolver.addressOf(endpoint);
K apply = zfn.apply(payload, origin);
return connectionManager.getConnection(ctx, new EndpointKeyWrapper(apply) {
@Override
void cleanup() {
addressResolver.removeAddress(state, origin);
decRefCount();
}
}, (key, dispose) -> {
Future<C> f = connectionManager.getConnection(ctx, apply, (key, dispose) -> {
class Disposer implements Runnable {
@Override
public void run() {
key.cleanup();
addressResolver.removeAddress(state, endpoint);
decRefCount();
dispose.run();
}
}
return endpointProvider.create(apply, new Disposer());
}, timeout);
return f.map(c -> new ConnectionLookup<>(c, addressResolver, endpoint));
}));
}
}

private abstract class EndpointKeyWrapper {
final K address;
public EndpointKeyWrapper(K address) {
this.address = address;
}
abstract void cleanup();
@Override
public int hashCode() {
return address.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof EndpointResolver.EndpointKeyWrapper) {
EndpointKeyWrapper that = (EndpointKeyWrapper) obj;
return address.equals(that.address);
}
return false;
}
@Override
public String toString() {
return "EndpointKey(z=" + address + ")";
}
}
}
Loading

0 comments on commit 5a17ab3

Please sign in to comment.