From abdf39a6c53e37549ec589863cba92909dcb99b0 Mon Sep 17 00:00:00 2001 From: Zengyi Wang Date: Tue, 30 Jul 2024 03:09:48 +0800 Subject: [PATCH 1/5] apply interceptors for socket handshake --- .../httpproxy/interceptors/impl/BodyInterceptorImpl.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/io/vertx/httpproxy/interceptors/impl/BodyInterceptorImpl.java b/src/main/java/io/vertx/httpproxy/interceptors/impl/BodyInterceptorImpl.java index 4e53987..3bf76e4 100644 --- a/src/main/java/io/vertx/httpproxy/interceptors/impl/BodyInterceptorImpl.java +++ b/src/main/java/io/vertx/httpproxy/interceptors/impl/BodyInterceptorImpl.java @@ -35,6 +35,8 @@ public BodyInterceptorImpl(Function modifyRequestBody, Function< @Override public Future handleProxyRequest(ProxyContext context) { + if (context.isWebSocket()) return context.sendRequest(); + Body body = context.request().getBody(); BufferingWriteStream bws = new BufferingWriteStream(); return body.stream().pipeTo(bws).compose(r -> { @@ -46,6 +48,8 @@ public Future handleProxyRequest(ProxyContext context) { @Override public Future handleProxyResponse(ProxyContext context) { + if (context.isWebSocket()) return context.sendResponse(); + Body body = context.response().getBody(); BufferingWriteStream bws = new BufferingWriteStream(); return body.stream().pipeTo(bws).compose(r -> { From d9a715d0241e084a2ba39ed0f3bdc230de5d81bd Mon Sep 17 00:00:00 2001 From: Zengyi Wang Date: Tue, 30 Jul 2024 12:56:01 +0800 Subject: [PATCH 2/5] configuration for applying websocket --- .../httpproxy/interceptors/impl/BodyInterceptorImpl.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/io/vertx/httpproxy/interceptors/impl/BodyInterceptorImpl.java b/src/main/java/io/vertx/httpproxy/interceptors/impl/BodyInterceptorImpl.java index 3bf76e4..4e53987 100644 --- a/src/main/java/io/vertx/httpproxy/interceptors/impl/BodyInterceptorImpl.java +++ b/src/main/java/io/vertx/httpproxy/interceptors/impl/BodyInterceptorImpl.java @@ -35,8 +35,6 @@ public BodyInterceptorImpl(Function modifyRequestBody, Function< @Override public Future handleProxyRequest(ProxyContext context) { - if (context.isWebSocket()) return context.sendRequest(); - Body body = context.request().getBody(); BufferingWriteStream bws = new BufferingWriteStream(); return body.stream().pipeTo(bws).compose(r -> { @@ -48,8 +46,6 @@ public Future handleProxyRequest(ProxyContext context) { @Override public Future handleProxyResponse(ProxyContext context) { - if (context.isWebSocket()) return context.sendResponse(); - Body body = context.response().getBody(); BufferingWriteStream bws = new BufferingWriteStream(); return body.stream().pipeTo(bws).compose(r -> { From 893755e15e6b7f7c1d17fe93086568c6061a935d Mon Sep 17 00:00:00 2001 From: Zengyi Wang Date: Wed, 31 Jul 2024 17:23:12 +0800 Subject: [PATCH 3/5] add test and doc --- src/main/asciidoc/index.adoc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index f8baec3..5dd17ee 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -184,4 +184,3 @@ You can use WebSocket interceptor to wrap an interceptor to let it allow WebSock ---- {@link examples.HttpProxyExamples#webSocketInterceptorPath} ---- - From 756e0d0f7d873380a00db2e29955786cc10eee53 Mon Sep 17 00:00:00 2001 From: Zengyi Wang Date: Tue, 6 Aug 2024 13:34:25 +0800 Subject: [PATCH 4/5] fix batch 1 --- src/test/java/io/vertx/tests/WebSocketCacheTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/io/vertx/tests/WebSocketCacheTest.java b/src/test/java/io/vertx/tests/WebSocketCacheTest.java index 910ed60..43fd8b0 100644 --- a/src/test/java/io/vertx/tests/WebSocketCacheTest.java +++ b/src/test/java/io/vertx/tests/WebSocketCacheTest.java @@ -45,3 +45,4 @@ public void testWsWithCache(TestContext ctx) { })); } } + From c9471280a3f327869b62cff63098e68e6fff6052 Mon Sep 17 00:00:00 2001 From: Zengyi Wang Date: Thu, 8 Aug 2024 21:47:51 +0800 Subject: [PATCH 5/5] add general context data --- src/main/java/examples/HttpProxyExamples.java | 8 +- .../java/io/vertx/httpproxy/HttpProxy.java | 46 +++++- .../io/vertx/httpproxy/impl/ReverseProxy.java | 21 +-- .../java/io/vertx/tests/ProxyContextTest.java | 141 ++++++++++++++++++ 4 files changed, 197 insertions(+), 19 deletions(-) create mode 100644 src/test/java/io/vertx/tests/ProxyContextTest.java diff --git a/src/main/java/examples/HttpProxyExamples.java b/src/main/java/examples/HttpProxyExamples.java index ecd3042..ffa19f1 100644 --- a/src/main/java/examples/HttpProxyExamples.java +++ b/src/main/java/examples/HttpProxyExamples.java @@ -44,20 +44,20 @@ public void proxy(Vertx vertx) { proxyServer.requestHandler(proxy).listen(8080); } - private SocketAddress resolveOriginAddress(HttpServerRequest request) { + private SocketAddress resolveOriginAddress(ProxyContext proxyContext) { return null; } public void originSelector(HttpProxy proxy) { - proxy.originSelector(request -> Future.succeededFuture(resolveOriginAddress(request))); + proxy.originSelector(proxyContext -> Future.succeededFuture(resolveOriginAddress(proxyContext))); } - private RequestOptions resolveOriginOptions(HttpServerRequest request) { + private RequestOptions resolveOriginOptions(ProxyContext proxyContext) { return null; } public void originRequestProvider(HttpProxy proxy) { - proxy.originRequestProvider((request, client) -> client.request(resolveOriginOptions(request))); + proxy.originRequestProvider((proxyContext, client) -> client.request(resolveOriginOptions(proxyContext))); } public void inboundInterceptor(HttpProxy proxy) { diff --git a/src/main/java/io/vertx/httpproxy/HttpProxy.java b/src/main/java/io/vertx/httpproxy/HttpProxy.java index 9137dda..c68e892 100644 --- a/src/main/java/io/vertx/httpproxy/HttpProxy.java +++ b/src/main/java/io/vertx/httpproxy/HttpProxy.java @@ -22,6 +22,8 @@ import io.vertx.core.net.SocketAddress; import io.vertx.httpproxy.impl.ReverseProxy; +import java.util.HashMap; +import java.util.Map; import java.util.function.BiFunction; import java.util.function.Function; @@ -76,6 +78,30 @@ default HttpProxy origin(int port, String host) { return origin(SocketAddress.inetSocketAddress(port, host)); } +// /** +// * Set a selector that resolves the origin address based on the incoming HTTP request. +// * +// * @param selector the selector +// * @return a reference to this, so the API can be used fluently +// */ +// @Fluent +// default HttpProxy originSelector(Function> selector) { +// return originRequestProvider((req, client) -> selector +// .apply(req) +// .flatMap(server -> client.request(new RequestOptions().setServer(server)))); +// } +// +// /** +// * Set a provider that creates the request to the origin server based the incoming HTTP request. +// * Setting a provider overrides any origin selector previously set. +// * +// * @param provider the provider +// * @return a reference to this, so the API can be used fluently +// */ +// @GenIgnore() +// @Fluent +// HttpProxy originRequestProvider(BiFunction> provider); + /** * Set a selector that resolves the origin address based on the incoming HTTP request. * @@ -83,9 +109,9 @@ default HttpProxy origin(int port, String host) { * @return a reference to this, so the API can be used fluently */ @Fluent - default HttpProxy originSelector(Function> selector) { - return originRequestProvider((req, client) -> selector - .apply(req) + default HttpProxy originSelector(Function> selector) { + return originRequestProvider((context, client) -> selector + .apply(context) .flatMap(server -> client.request(new RequestOptions().setServer(server)))); } @@ -98,7 +124,7 @@ default HttpProxy originSelector(Function> provider); + HttpProxy originRequestProvider(BiFunction> provider); /** * Add an interceptor to the interceptor chain. @@ -114,6 +140,16 @@ default HttpProxy originSelector(Function()); + } + + /** + * Handle the outbound {@code HttpServerRequest}. + * + * @param request the outbound {@code HttpServerRequest} + * @param attachments the contextual data holder for {@code ProxyContext}. Must be mutable. + */ + void handle(HttpServerRequest request, Map attachments); } diff --git a/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java b/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java index 22ca1cd..72191ea 100644 --- a/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java +++ b/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java @@ -29,7 +29,7 @@ public class ReverseProxy implements HttpProxy { private final static Logger log = LoggerFactory.getLogger(ReverseProxy.class); private final HttpClient client; private final boolean supportWebSocket; - private BiFunction> selector = (req, client) -> Future.failedFuture("No origin available"); + private BiFunction> selector = (req, client) -> Future.failedFuture("No origin available"); private final List interceptors = new ArrayList<>(); public ReverseProxy(ProxyOptions options, HttpClient client) { @@ -43,7 +43,7 @@ public ReverseProxy(ProxyOptions options, HttpClient client) { } @Override - public HttpProxy originRequestProvider(BiFunction> provider) { + public HttpProxy originRequestProvider(BiFunction> provider) { selector = provider; return this; } @@ -56,7 +56,7 @@ public HttpProxy addInterceptor(ProxyInterceptor interceptor) { @Override - public void handle(HttpServerRequest request) { + public void handle(HttpServerRequest request, Map attachments) { ProxyRequest proxyRequest = ProxyRequest.reverseProxy(request); // Encoding sanity check @@ -67,7 +67,7 @@ public void handle(HttpServerRequest request) { } boolean isWebSocket = supportWebSocket && request.canUpgradeToWebSocket(); - Proxy proxy = new Proxy(proxyRequest, isWebSocket); + Proxy proxy = new Proxy(proxyRequest, isWebSocket, attachments); proxy.filters = interceptors.listIterator(); proxy.sendRequest() .recover(throwable -> { @@ -91,21 +91,22 @@ private void end(ProxyRequest proxyRequest, int sc) { .send(); } - private Future resolveOrigin(HttpServerRequest proxiedRequest) { - return selector.apply(proxiedRequest, client); + private Future resolveOrigin(ProxyContext context) { + return selector.apply(context, client); } private class Proxy implements ProxyContext { private final ProxyRequest request; private ProxyResponse response; - private final Map attachments = new HashMap<>(); + private final Map attachments; private ListIterator filters; private final boolean isWebSocket; - private Proxy(ProxyRequest request, boolean isWebSocket) { + private Proxy(ProxyRequest request, boolean isWebSocket, Map attachments) { this.request = request; this.isWebSocket = isWebSocket; + this.attachments = attachments; } @Override @@ -140,7 +141,7 @@ public Future sendRequest() { } else { if (isWebSocket) { HttpServerRequest proxiedRequest = request().proxiedRequest(); - return resolveOrigin(proxiedRequest).compose(request -> { + return resolveOrigin(this).compose(request -> { request.setMethod(request().getMethod()); request.setURI(request().getURI()); request.headers().addAll(request().headers()); @@ -202,7 +203,7 @@ public Future sendResponse() { } private Future sendProxyRequest(ProxyRequest proxyRequest) { - return resolveOrigin(proxyRequest.proxiedRequest()).compose(proxyRequest::send); + return resolveOrigin(this).compose(proxyRequest::send); } private Future sendProxyResponse(ProxyResponse response) { diff --git a/src/test/java/io/vertx/tests/ProxyContextTest.java b/src/test/java/io/vertx/tests/ProxyContextTest.java new file mode 100644 index 0000000..44679cc --- /dev/null +++ b/src/test/java/io/vertx/tests/ProxyContextTest.java @@ -0,0 +1,141 @@ +package io.vertx.tests; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.http.*; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.httpproxy.*; +import org.junit.Test; + +import java.io.Closeable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; +import java.util.function.Consumer; + +public class ProxyContextTest extends ProxyTestBase { + + private WebSocketClient wsClient; + + public ProxyContextTest(ProxyOptions options) { + super(options); + } + + @Override + public void tearDown(TestContext context) { + super.tearDown(context); + wsClient = null; + } + + // same in TestBase, but allow to attach contexts + private Closeable startProxy(Consumer config, Map attachments) { + CompletableFuture res = new CompletableFuture<>(); + vertx.deployVerticle(new AbstractVerticle() { + HttpClient proxyClient; + HttpServer proxyServer; + HttpProxy proxy; + @Override + public void start(Promise startFuture) { + proxyClient = vertx.createHttpClient(new HttpClientOptions(clientOptions)); + proxyServer = vertx.createHttpServer(new HttpServerOptions(serverOptions)); + proxy = HttpProxy.reverseProxy(proxyOptions, proxyClient); + config.accept(proxy); + proxyServer.requestHandler(request -> { + proxy.handle(request, attachments); + }); + proxyServer.listen().onComplete(ar -> startFuture.handle(ar.mapEmpty())); + } + }).onComplete(ar -> { + if (ar.succeeded()) { + String id = ar.result(); + res.complete(() -> { + CountDownLatch latch = new CountDownLatch(1); + vertx.undeploy(id).onComplete(ar2 -> latch.countDown()); + try { + latch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + }); + } else { + res.completeExceptionally(ar.cause()); + } + }); + try { + return res.get(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } catch (ExecutionException e) { + throw new AssertionError(e.getMessage()); + } catch (TimeoutException e) { + throw new AssertionError(e); + } + } + + @Test + public void testOriginSelector(TestContext ctx) { + Async latch = ctx.async(); + SocketAddress backend = startHttpBackend(ctx, 8081, req -> { + req.response().end("end"); + }); + startProxy(proxy -> { + proxy.originSelector(context -> Future.succeededFuture(context.get("backend", SocketAddress.class))); + }, new HashMap<>(Map.of("backend", backend))); + vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/") + .compose(HttpClientRequest::send) + .onComplete(ctx.asyncAssertSuccess(resp -> { + ctx.assertEquals(resp.statusCode(), 200); + latch.complete(); + })); + } + + @Test + public void testOriginSelectorWebSocket(TestContext ctx) { + Async latch = ctx.async(); + SocketAddress backend = startHttpBackend(ctx, 8081, req -> { + req.toWebSocket().onSuccess(ws -> { + ws.handler(ws::write); + }); + }); + startProxy(proxy -> { + proxy.originSelector(context -> Future.succeededFuture(context.get("backend", SocketAddress.class))); + }, new HashMap<>(Map.of("backend", backend))); + wsClient = vertx.createWebSocketClient(); + wsClient.connect(8080, "localhost", "/") + .onComplete(ctx.asyncAssertSuccess(ws -> { + latch.complete(); + })); + } + + @Test + public void testInterceptor(TestContext ctx) { + Async latch = ctx.async(); + SocketAddress backend = startHttpBackend(ctx, 8081, req -> { + if (!req.uri().equals("/new-uri")) { + req.response().setStatusCode(404).end(); + } + req.response().end("end"); + }); + startProxy(proxy -> { + proxy.origin(backend) + .addInterceptor(new ProxyInterceptor() { + @Override + public Future handleProxyRequest(ProxyContext context) { + context.request().setURI(context.get("uri", String.class)); + return context.sendRequest(); + } + }); + }, new HashMap<>(Map.of("uri", "/new-uri"))); + vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/") + .compose(HttpClientRequest::send) + .onComplete(ctx.asyncAssertSuccess(resp -> { + ctx.assertEquals(resp.statusCode(), 200); + latch.complete(); + })); + } +}