diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc
index f8baec3..5dd17ee 100644
--- a/src/main/asciidoc/index.adoc
+++ b/src/main/asciidoc/index.adoc
@@ -184,4 +184,3 @@ You can use WebSocket interceptor to wrap an interceptor to let it allow WebSock
----
{@link examples.HttpProxyExamples#webSocketInterceptorPath}
----
-
diff --git a/src/main/java/examples/HttpProxyExamples.java b/src/main/java/examples/HttpProxyExamples.java
index ecd3042..ffa19f1 100644
--- a/src/main/java/examples/HttpProxyExamples.java
+++ b/src/main/java/examples/HttpProxyExamples.java
@@ -44,20 +44,20 @@ public void proxy(Vertx vertx) {
proxyServer.requestHandler(proxy).listen(8080);
}
- private SocketAddress resolveOriginAddress(HttpServerRequest request) {
+ private SocketAddress resolveOriginAddress(ProxyContext proxyContext) {
return null;
}
public void originSelector(HttpProxy proxy) {
- proxy.originSelector(request -> Future.succeededFuture(resolveOriginAddress(request)));
+ proxy.originSelector(proxyContext -> Future.succeededFuture(resolveOriginAddress(proxyContext)));
}
- private RequestOptions resolveOriginOptions(HttpServerRequest request) {
+ private RequestOptions resolveOriginOptions(ProxyContext proxyContext) {
return null;
}
public void originRequestProvider(HttpProxy proxy) {
- proxy.originRequestProvider((request, client) -> client.request(resolveOriginOptions(request)));
+ proxy.originRequestProvider((proxyContext, client) -> client.request(resolveOriginOptions(proxyContext)));
}
public void inboundInterceptor(HttpProxy proxy) {
diff --git a/src/main/java/io/vertx/httpproxy/HttpProxy.java b/src/main/java/io/vertx/httpproxy/HttpProxy.java
index 9137dda..c68e892 100644
--- a/src/main/java/io/vertx/httpproxy/HttpProxy.java
+++ b/src/main/java/io/vertx/httpproxy/HttpProxy.java
@@ -22,6 +22,8 @@
import io.vertx.core.net.SocketAddress;
import io.vertx.httpproxy.impl.ReverseProxy;
+import java.util.HashMap;
+import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -76,6 +78,30 @@ default HttpProxy origin(int port, String host) {
return origin(SocketAddress.inetSocketAddress(port, host));
}
+// /**
+// * Set a selector that resolves the origin address based on the incoming HTTP request.
+// *
+// * @param selector the selector
+// * @return a reference to this, so the API can be used fluently
+// */
+// @Fluent
+// default HttpProxy originSelector(Function> selector) {
+// return originRequestProvider((req, client) -> selector
+// .apply(req)
+// .flatMap(server -> client.request(new RequestOptions().setServer(server))));
+// }
+//
+// /**
+// * Set a provider that creates the request to the origin server based the incoming HTTP request.
+// * Setting a provider overrides any origin selector previously set.
+// *
+// * @param provider the provider
+// * @return a reference to this, so the API can be used fluently
+// */
+// @GenIgnore()
+// @Fluent
+// HttpProxy originRequestProvider(BiFunction> provider);
+
/**
* Set a selector that resolves the origin address based on the incoming HTTP request.
*
@@ -83,9 +109,9 @@ default HttpProxy origin(int port, String host) {
* @return a reference to this, so the API can be used fluently
*/
@Fluent
- default HttpProxy originSelector(Function> selector) {
- return originRequestProvider((req, client) -> selector
- .apply(req)
+ default HttpProxy originSelector(Function> selector) {
+ return originRequestProvider((context, client) -> selector
+ .apply(context)
.flatMap(server -> client.request(new RequestOptions().setServer(server))));
}
@@ -98,7 +124,7 @@ default HttpProxy originSelector(Function> provider);
+ HttpProxy originRequestProvider(BiFunction> provider);
/**
* Add an interceptor to the interceptor chain.
@@ -114,6 +140,16 @@ default HttpProxy originSelector(Function());
+ }
+
+ /**
+ * Handle the outbound {@code HttpServerRequest}.
+ *
+ * @param request the outbound {@code HttpServerRequest}
+ * @param attachments the contextual data holder for {@code ProxyContext}. Must be mutable.
+ */
+ void handle(HttpServerRequest request, Map attachments);
}
diff --git a/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java b/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java
index 22ca1cd..72191ea 100644
--- a/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java
+++ b/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java
@@ -29,7 +29,7 @@ public class ReverseProxy implements HttpProxy {
private final static Logger log = LoggerFactory.getLogger(ReverseProxy.class);
private final HttpClient client;
private final boolean supportWebSocket;
- private BiFunction> selector = (req, client) -> Future.failedFuture("No origin available");
+ private BiFunction> selector = (req, client) -> Future.failedFuture("No origin available");
private final List interceptors = new ArrayList<>();
public ReverseProxy(ProxyOptions options, HttpClient client) {
@@ -43,7 +43,7 @@ public ReverseProxy(ProxyOptions options, HttpClient client) {
}
@Override
- public HttpProxy originRequestProvider(BiFunction> provider) {
+ public HttpProxy originRequestProvider(BiFunction> provider) {
selector = provider;
return this;
}
@@ -56,7 +56,7 @@ public HttpProxy addInterceptor(ProxyInterceptor interceptor) {
@Override
- public void handle(HttpServerRequest request) {
+ public void handle(HttpServerRequest request, Map attachments) {
ProxyRequest proxyRequest = ProxyRequest.reverseProxy(request);
// Encoding sanity check
@@ -67,7 +67,7 @@ public void handle(HttpServerRequest request) {
}
boolean isWebSocket = supportWebSocket && request.canUpgradeToWebSocket();
- Proxy proxy = new Proxy(proxyRequest, isWebSocket);
+ Proxy proxy = new Proxy(proxyRequest, isWebSocket, attachments);
proxy.filters = interceptors.listIterator();
proxy.sendRequest()
.recover(throwable -> {
@@ -91,21 +91,22 @@ private void end(ProxyRequest proxyRequest, int sc) {
.send();
}
- private Future resolveOrigin(HttpServerRequest proxiedRequest) {
- return selector.apply(proxiedRequest, client);
+ private Future resolveOrigin(ProxyContext context) {
+ return selector.apply(context, client);
}
private class Proxy implements ProxyContext {
private final ProxyRequest request;
private ProxyResponse response;
- private final Map attachments = new HashMap<>();
+ private final Map attachments;
private ListIterator filters;
private final boolean isWebSocket;
- private Proxy(ProxyRequest request, boolean isWebSocket) {
+ private Proxy(ProxyRequest request, boolean isWebSocket, Map attachments) {
this.request = request;
this.isWebSocket = isWebSocket;
+ this.attachments = attachments;
}
@Override
@@ -140,7 +141,7 @@ public Future sendRequest() {
} else {
if (isWebSocket) {
HttpServerRequest proxiedRequest = request().proxiedRequest();
- return resolveOrigin(proxiedRequest).compose(request -> {
+ return resolveOrigin(this).compose(request -> {
request.setMethod(request().getMethod());
request.setURI(request().getURI());
request.headers().addAll(request().headers());
@@ -202,7 +203,7 @@ public Future sendResponse() {
}
private Future sendProxyRequest(ProxyRequest proxyRequest) {
- return resolveOrigin(proxyRequest.proxiedRequest()).compose(proxyRequest::send);
+ return resolveOrigin(this).compose(proxyRequest::send);
}
private Future sendProxyResponse(ProxyResponse response) {
diff --git a/src/test/java/io/vertx/tests/ProxyContextTest.java b/src/test/java/io/vertx/tests/ProxyContextTest.java
new file mode 100644
index 0000000..44679cc
--- /dev/null
+++ b/src/test/java/io/vertx/tests/ProxyContextTest.java
@@ -0,0 +1,141 @@
+package io.vertx.tests;
+
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.http.*;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.httpproxy.*;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+
+public class ProxyContextTest extends ProxyTestBase {
+
+ private WebSocketClient wsClient;
+
+ public ProxyContextTest(ProxyOptions options) {
+ super(options);
+ }
+
+ @Override
+ public void tearDown(TestContext context) {
+ super.tearDown(context);
+ wsClient = null;
+ }
+
+ // same in TestBase, but allow to attach contexts
+ private Closeable startProxy(Consumer config, Map attachments) {
+ CompletableFuture res = new CompletableFuture<>();
+ vertx.deployVerticle(new AbstractVerticle() {
+ HttpClient proxyClient;
+ HttpServer proxyServer;
+ HttpProxy proxy;
+ @Override
+ public void start(Promise startFuture) {
+ proxyClient = vertx.createHttpClient(new HttpClientOptions(clientOptions));
+ proxyServer = vertx.createHttpServer(new HttpServerOptions(serverOptions));
+ proxy = HttpProxy.reverseProxy(proxyOptions, proxyClient);
+ config.accept(proxy);
+ proxyServer.requestHandler(request -> {
+ proxy.handle(request, attachments);
+ });
+ proxyServer.listen().onComplete(ar -> startFuture.handle(ar.mapEmpty()));
+ }
+ }).onComplete(ar -> {
+ if (ar.succeeded()) {
+ String id = ar.result();
+ res.complete(() -> {
+ CountDownLatch latch = new CountDownLatch(1);
+ vertx.undeploy(id).onComplete(ar2 -> latch.countDown());
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AssertionError(e);
+ }
+ });
+ } else {
+ res.completeExceptionally(ar.cause());
+ }
+ });
+ try {
+ return res.get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AssertionError(e);
+ } catch (ExecutionException e) {
+ throw new AssertionError(e.getMessage());
+ } catch (TimeoutException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Test
+ public void testOriginSelector(TestContext ctx) {
+ Async latch = ctx.async();
+ SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
+ req.response().end("end");
+ });
+ startProxy(proxy -> {
+ proxy.originSelector(context -> Future.succeededFuture(context.get("backend", SocketAddress.class)));
+ }, new HashMap<>(Map.of("backend", backend)));
+ vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
+ .compose(HttpClientRequest::send)
+ .onComplete(ctx.asyncAssertSuccess(resp -> {
+ ctx.assertEquals(resp.statusCode(), 200);
+ latch.complete();
+ }));
+ }
+
+ @Test
+ public void testOriginSelectorWebSocket(TestContext ctx) {
+ Async latch = ctx.async();
+ SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
+ req.toWebSocket().onSuccess(ws -> {
+ ws.handler(ws::write);
+ });
+ });
+ startProxy(proxy -> {
+ proxy.originSelector(context -> Future.succeededFuture(context.get("backend", SocketAddress.class)));
+ }, new HashMap<>(Map.of("backend", backend)));
+ wsClient = vertx.createWebSocketClient();
+ wsClient.connect(8080, "localhost", "/")
+ .onComplete(ctx.asyncAssertSuccess(ws -> {
+ latch.complete();
+ }));
+ }
+
+ @Test
+ public void testInterceptor(TestContext ctx) {
+ Async latch = ctx.async();
+ SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
+ if (!req.uri().equals("/new-uri")) {
+ req.response().setStatusCode(404).end();
+ }
+ req.response().end("end");
+ });
+ startProxy(proxy -> {
+ proxy.origin(backend)
+ .addInterceptor(new ProxyInterceptor() {
+ @Override
+ public Future handleProxyRequest(ProxyContext context) {
+ context.request().setURI(context.get("uri", String.class));
+ return context.sendRequest();
+ }
+ });
+ }, new HashMap<>(Map.of("uri", "/new-uri")));
+ vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
+ .compose(HttpClientRequest::send)
+ .onComplete(ctx.asyncAssertSuccess(resp -> {
+ ctx.assertEquals(resp.statusCode(), 200);
+ latch.complete();
+ }));
+ }
+}
diff --git a/src/test/java/io/vertx/tests/WebSocketCacheTest.java b/src/test/java/io/vertx/tests/WebSocketCacheTest.java
index 910ed60..43fd8b0 100644
--- a/src/test/java/io/vertx/tests/WebSocketCacheTest.java
+++ b/src/test/java/io/vertx/tests/WebSocketCacheTest.java
@@ -45,3 +45,4 @@ public void testWsWithCache(TestContext ctx) {
}));
}
}
+