Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add general contextual data #96

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,3 @@ You can use WebSocket interceptor to wrap an interceptor to let it allow WebSock
----
{@link examples.HttpProxyExamples#webSocketInterceptorPath}
----

8 changes: 4 additions & 4 deletions src/main/java/examples/HttpProxyExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ public void proxy(Vertx vertx) {
proxyServer.requestHandler(proxy).listen(8080);
}

private SocketAddress resolveOriginAddress(HttpServerRequest request) {
private SocketAddress resolveOriginAddress(ProxyContext proxyContext) {
return null;
}

public void originSelector(HttpProxy proxy) {
proxy.originSelector(request -> Future.succeededFuture(resolveOriginAddress(request)));
proxy.originSelector(proxyContext -> Future.succeededFuture(resolveOriginAddress(proxyContext)));
}

private RequestOptions resolveOriginOptions(HttpServerRequest request) {
private RequestOptions resolveOriginOptions(ProxyContext proxyContext) {
return null;
}

public void originRequestProvider(HttpProxy proxy) {
proxy.originRequestProvider((request, client) -> client.request(resolveOriginOptions(request)));
proxy.originRequestProvider((proxyContext, client) -> client.request(resolveOriginOptions(proxyContext)));
}

public void inboundInterceptor(HttpProxy proxy) {
Expand Down
46 changes: 41 additions & 5 deletions src/main/java/io/vertx/httpproxy/HttpProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.vertx.core.net.SocketAddress;
import io.vertx.httpproxy.impl.ReverseProxy;

import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;

Expand Down Expand Up @@ -76,16 +78,40 @@ default HttpProxy origin(int port, String host) {
return origin(SocketAddress.inetSocketAddress(port, host));
}

// /**
// * Set a selector that resolves the <i><b>origin</b></i> address based on the incoming HTTP request.
// *
// * @param selector the selector
// * @return a reference to this, so the API can be used fluently
// */
// @Fluent
// default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddress>> selector) {
// return originRequestProvider((req, client) -> selector
// .apply(req)
// .flatMap(server -> client.request(new RequestOptions().setServer(server))));
// }
//
// /**
// * Set a provider that creates the request to the <i><b>origin</b></i> server based the incoming HTTP request.
// * Setting a provider overrides any origin selector previously set.
// *
// * @param provider the provider
// * @return a reference to this, so the API can be used fluently
// */
// @GenIgnore()
// @Fluent
// HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider);

/**
* Set a selector that resolves the <i><b>origin</b></i> address based on the incoming HTTP request.
*
* @param selector the selector
* @return a reference to this, so the API can be used fluently
*/
@Fluent
default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddress>> selector) {
return originRequestProvider((req, client) -> selector
.apply(req)
default HttpProxy originSelector(Function<ProxyContext, Future<SocketAddress>> selector) {
return originRequestProvider((context, client) -> selector
.apply(context)
.flatMap(server -> client.request(new RequestOptions().setServer(server))));
}

Expand All @@ -98,7 +124,7 @@ default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddres
*/
@GenIgnore()
@Fluent
HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider);
HttpProxy originRequestProvider(BiFunction<ProxyContext, HttpClient, Future<HttpClientRequest>> provider);

/**
* Add an interceptor to the interceptor chain.
Expand All @@ -114,6 +140,16 @@ default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddres
*
* @param request the outbound {@code HttpServerRequest}
*/
void handle(HttpServerRequest request);
default void handle(HttpServerRequest request) {
handle(request, new HashMap<>());
}

/**
* Handle the <i><b>outbound</b></i> {@code HttpServerRequest}.
*
* @param request the outbound {@code HttpServerRequest}
* @param attachments the contextual data holder for {@code ProxyContext}. Must be mutable.
*/
void handle(HttpServerRequest request, Map<String, Object> attachments);

}
21 changes: 11 additions & 10 deletions src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class ReverseProxy implements HttpProxy {
private final static Logger log = LoggerFactory.getLogger(ReverseProxy.class);
private final HttpClient client;
private final boolean supportWebSocket;
private BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> selector = (req, client) -> Future.failedFuture("No origin available");
private BiFunction<ProxyContext, HttpClient, Future<HttpClientRequest>> selector = (req, client) -> Future.failedFuture("No origin available");
private final List<ProxyInterceptor> interceptors = new ArrayList<>();

public ReverseProxy(ProxyOptions options, HttpClient client) {
Expand All @@ -43,7 +43,7 @@ public ReverseProxy(ProxyOptions options, HttpClient client) {
}

@Override
public HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider) {
public HttpProxy originRequestProvider(BiFunction<ProxyContext, HttpClient, Future<HttpClientRequest>> provider) {
selector = provider;
return this;
}
Expand All @@ -56,7 +56,7 @@ public HttpProxy addInterceptor(ProxyInterceptor interceptor) {


@Override
public void handle(HttpServerRequest request) {
public void handle(HttpServerRequest request, Map<String, Object> attachments) {
ProxyRequest proxyRequest = ProxyRequest.reverseProxy(request);

// Encoding sanity check
Expand All @@ -67,7 +67,7 @@ public void handle(HttpServerRequest request) {
}

boolean isWebSocket = supportWebSocket && request.canUpgradeToWebSocket();
Proxy proxy = new Proxy(proxyRequest, isWebSocket);
Proxy proxy = new Proxy(proxyRequest, isWebSocket, attachments);
proxy.filters = interceptors.listIterator();
proxy.sendRequest()
.recover(throwable -> {
Expand All @@ -91,21 +91,22 @@ private void end(ProxyRequest proxyRequest, int sc) {
.send();
}

private Future<HttpClientRequest> resolveOrigin(HttpServerRequest proxiedRequest) {
return selector.apply(proxiedRequest, client);
private Future<HttpClientRequest> resolveOrigin(ProxyContext context) {
return selector.apply(context, client);
}

private class Proxy implements ProxyContext {

private final ProxyRequest request;
private ProxyResponse response;
private final Map<String, Object> attachments = new HashMap<>();
private final Map<String, Object> attachments;
private ListIterator<ProxyInterceptor> filters;
private final boolean isWebSocket;

private Proxy(ProxyRequest request, boolean isWebSocket) {
private Proxy(ProxyRequest request, boolean isWebSocket, Map<String, Object> attachments) {
this.request = request;
this.isWebSocket = isWebSocket;
this.attachments = attachments;
}

@Override
Expand Down Expand Up @@ -140,7 +141,7 @@ public Future<ProxyResponse> sendRequest() {
} else {
if (isWebSocket) {
HttpServerRequest proxiedRequest = request().proxiedRequest();
return resolveOrigin(proxiedRequest).compose(request -> {
return resolveOrigin(this).compose(request -> {
request.setMethod(request().getMethod());
request.setURI(request().getURI());
request.headers().addAll(request().headers());
Expand Down Expand Up @@ -202,7 +203,7 @@ public Future<Void> sendResponse() {
}

private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest) {
return resolveOrigin(proxyRequest.proxiedRequest()).compose(proxyRequest::send);
return resolveOrigin(this).compose(proxyRequest::send);
}

private Future<Void> sendProxyResponse(ProxyResponse response) {
Expand Down
141 changes: 141 additions & 0 deletions src/test/java/io/vertx/tests/ProxyContextTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package io.vertx.tests;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.*;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.httpproxy.*;
import org.junit.Test;

import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Consumer;

public class ProxyContextTest extends ProxyTestBase {

private WebSocketClient wsClient;

public ProxyContextTest(ProxyOptions options) {
super(options);
}

@Override
public void tearDown(TestContext context) {
super.tearDown(context);
wsClient = null;
}

// same in TestBase, but allow to attach contexts
private Closeable startProxy(Consumer<HttpProxy> config, Map<String, Object> attachments) {
CompletableFuture<Closeable> res = new CompletableFuture<>();
vertx.deployVerticle(new AbstractVerticle() {
HttpClient proxyClient;
HttpServer proxyServer;
HttpProxy proxy;
@Override
public void start(Promise<Void> startFuture) {
proxyClient = vertx.createHttpClient(new HttpClientOptions(clientOptions));
proxyServer = vertx.createHttpServer(new HttpServerOptions(serverOptions));
proxy = HttpProxy.reverseProxy(proxyOptions, proxyClient);
config.accept(proxy);
proxyServer.requestHandler(request -> {
proxy.handle(request, attachments);
});
proxyServer.listen().onComplete(ar -> startFuture.handle(ar.mapEmpty()));
}
}).onComplete(ar -> {
if (ar.succeeded()) {
String id = ar.result();
res.complete(() -> {
CountDownLatch latch = new CountDownLatch(1);
vertx.undeploy(id).onComplete(ar2 -> latch.countDown());
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
});
} else {
res.completeExceptionally(ar.cause());
}
});
try {
return res.get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
} catch (ExecutionException e) {
throw new AssertionError(e.getMessage());
} catch (TimeoutException e) {
throw new AssertionError(e);
}
}

@Test
public void testOriginSelector(TestContext ctx) {
Async latch = ctx.async();
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
req.response().end("end");
});
startProxy(proxy -> {
proxy.originSelector(context -> Future.succeededFuture(context.get("backend", SocketAddress.class)));
}, new HashMap<>(Map.of("backend", backend)));
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(HttpClientRequest::send)
.onComplete(ctx.asyncAssertSuccess(resp -> {
ctx.assertEquals(resp.statusCode(), 200);
latch.complete();
}));
}

@Test
public void testOriginSelectorWebSocket(TestContext ctx) {
Async latch = ctx.async();
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
req.toWebSocket().onSuccess(ws -> {
ws.handler(ws::write);
});
});
startProxy(proxy -> {
proxy.originSelector(context -> Future.succeededFuture(context.get("backend", SocketAddress.class)));
}, new HashMap<>(Map.of("backend", backend)));
wsClient = vertx.createWebSocketClient();
wsClient.connect(8080, "localhost", "/")
.onComplete(ctx.asyncAssertSuccess(ws -> {
latch.complete();
}));
}

@Test
public void testInterceptor(TestContext ctx) {
Async latch = ctx.async();
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
if (!req.uri().equals("/new-uri")) {
req.response().setStatusCode(404).end();
}
req.response().end("end");
});
startProxy(proxy -> {
proxy.origin(backend)
.addInterceptor(new ProxyInterceptor() {
@Override
public Future<ProxyResponse> handleProxyRequest(ProxyContext context) {
context.request().setURI(context.get("uri", String.class));
return context.sendRequest();
}
});
}, new HashMap<>(Map.of("uri", "/new-uri")));
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(HttpClientRequest::send)
.onComplete(ctx.asyncAssertSuccess(resp -> {
ctx.assertEquals(resp.statusCode(), 200);
latch.complete();
}));
}
}
1 change: 1 addition & 0 deletions src/test/java/io/vertx/tests/WebSocketCacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ public void testWsWithCache(TestContext ctx) {
}));
}
}

Loading