From 14b2bfca46b27235d3a8350ce9cf63d4e9fb1ab5 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 6 Jun 2022 11:27:21 +0300 Subject: [PATCH] Http2StreamChannelBootstrap can be shared (#2257) Http2StreamFrameToHttpObjectCodec can be shared Http2StreamBridgeClientHandler can be shared Related to #2151 and #2262 --- .../http/client/Http2ConnectionProvider.java | 29 +++- .../Http2StreamBridgeClientHandler.java | 27 +--- .../netty/http/client/HttpClientConfig.java | 128 ++++++++++-------- .../netty/http/server/HttpServerConfig.java | 5 +- .../netty/http/HttpProtocolsTests.java | 2 +- 5 files changed, 102 insertions(+), 89 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index 69f3f5ca7e..8ef73db2fc 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -21,6 +21,7 @@ import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2LocalFlowController; import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.resolver.AddressResolverGroup; import io.netty.util.AttributeKey; @@ -300,8 +301,7 @@ else if (p.state != null) { return; } - HttpClientConfig.openStream(channel, this, obs, opsFactory, acceptGzip, metricsRecorder, uriTagValue) - .addListener(this); + http2StreamChannelBootstrap(channel).open().addListener(this); } @Override @@ -354,6 +354,10 @@ public void operationComplete(Future future) { } } else { + Http2ConnectionProvider.registerClose(ch, this); + HttpClientConfig.addStreamHandlers(ch, obs.then(new HttpClientConfig.StreamConnectionObserver(currentContext())), + opsFactory, acceptGzip, metricsRecorder, -1, uriTagValue); + ChannelOperations ops = ChannelOperations.get(ch); if (ops != null) { obs.onStateChange(ops, STREAM_CONFIGURED); @@ -420,6 +424,27 @@ else if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() == n } return false; } + + static final AttributeKey HTTP2_STREAM_CHANNEL_BOOTSTRAP = + AttributeKey.valueOf("http2StreamChannelBootstrap"); + + static Http2StreamChannelBootstrap http2StreamChannelBootstrap(Channel channel) { + Http2StreamChannelBootstrap http2StreamChannelBootstrap; + + for (;;) { + http2StreamChannelBootstrap = channel.attr(HTTP2_STREAM_CHANNEL_BOOTSTRAP).get(); + if (http2StreamChannelBootstrap == null) { + http2StreamChannelBootstrap = new Http2StreamChannelBootstrap(channel); + } + else { + return http2StreamChannelBootstrap; + } + if (channel.attr(HTTP2_STREAM_CHANNEL_BOOTSTRAP) + .compareAndSet(null, http2StreamChannelBootstrap)) { + return http2StreamChannelBootstrap; + } + } + } } static final class PendingConnectionObserver implements ConnectionObserver { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2StreamBridgeClientHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2StreamBridgeClientHandler.java index a432f55d41..f1808b573a 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2StreamBridgeClientHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2StreamBridgeClientHandler.java @@ -17,15 +17,11 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; -import reactor.netty.Connection; -import reactor.netty.ConnectionObserver; -import reactor.netty.channel.ChannelOperations; - -import static reactor.netty.ReactorNetty.format; /** * This handler is intended to work together with {@link Http2StreamFrameToHttpObjectCodec} @@ -35,33 +31,14 @@ * @author Violeta Georgieva * @since 1.0.0 */ +@ChannelHandler.Sharable final class Http2StreamBridgeClientHandler extends ChannelDuplexHandler { - final ConnectionObserver observer; - final ChannelOperations.OnSetup opsFactory; - - Http2StreamBridgeClientHandler(ConnectionObserver listener, ChannelOperations.OnSetup opsFactory) { - this.observer = listener; - this.opsFactory = opsFactory; - } - @Override public void channelActive(ChannelHandlerContext ctx) { ctx.read(); } - @Override - public void handlerAdded(ChannelHandlerContext ctx) { - if (HttpClientOperations.log.isDebugEnabled()) { - HttpClientOperations.log.debug(format(ctx.channel(), "New HTTP/2 stream")); - } - - ChannelOperations ops = opsFactory.create(Connection.from(ctx.channel()), observer, null); - if (ops != null) { - ops.bind(); - } - } - @Override @SuppressWarnings("FutureReturnValueIgnored") public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java index 3eeceabf3d..0778725454 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java @@ -52,8 +52,6 @@ import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2MultiplexHandler; import io.netty.handler.codec.http2.Http2Settings; -import io.netty.handler.codec.http2.Http2StreamChannel; -import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; @@ -61,7 +59,6 @@ import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.resolver.AddressResolverGroup; -import io.netty.util.concurrent.Future; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.netty.ChannelPipelineConfigurer; @@ -504,6 +501,65 @@ Http2Settings http2Settings() { return settings; } + static void addStreamHandlers( + Channel ch, + ConnectionObserver obs, + ChannelOperations.OnSetup opsFactory, + boolean acceptGzip, + @Nullable ChannelMetricsRecorder metricsRecorder, + long responseTimeoutMillis, + @Nullable Function uriTagValue) { + + if (HttpClientOperations.log.isDebugEnabled()) { + HttpClientOperations.log.debug(format(ch, "New HTTP/2 stream")); + } + + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT) + .addLast(NettyPipeline.HttpTrafficHandler, HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER); + + if (acceptGzip) { + pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor()); + } + + ChannelOperations.addReactiveBridge(ch, opsFactory, obs); + + if (metricsRecorder != null) { + if (metricsRecorder instanceof HttpClientMetricsRecorder) { + ChannelHandler handler; + Channel parent = ch.parent(); + ChannelHandler existingHandler = parent.pipeline().get(NettyPipeline.HttpMetricsHandler); + if (existingHandler != null) { + // This use case can happen only in HTTP/2 clear text connection upgrade + parent.pipeline().remove(NettyPipeline.HttpMetricsHandler); + handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ? + new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsHandler) existingHandler) : + new HttpClientMetricsHandler((HttpClientMetricsHandler) existingHandler); + } + else { + handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ? + new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, uriTagValue) : + new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, uriTagValue); + } + pipeline.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler); + } + } + + if (responseTimeoutMillis > -1) { + Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler, + new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS)); + } + + if (log.isDebugEnabled()) { + log.debug(format(ch, "Initialized HTTP/2 stream pipeline {}"), ch.pipeline()); + } + + ChannelOperations ops = opsFactory.create(Connection.from(ch), obs, null); + if (ops != null) { + ops.bind(); + } + } + static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpResponseDecoderSpec decoder, Http2Settings http2Settings, ConnectionObserver observer) { Http2FrameCodecBuilder http2FrameCodecBuilder = @@ -608,20 +664,6 @@ static void configureHttp11Pipeline(ChannelPipeline p, } } - static Future openStream( - Channel channel, - Http2ConnectionProvider.DisposableAcquire owner, - ConnectionObserver observer, - ChannelOperations.OnSetup opsFactory, - boolean acceptGzip, - @Nullable ChannelMetricsRecorder metricsRecorder, - @Nullable Function uriTagValue) { - Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(channel); - bootstrap.option(ChannelOption.AUTO_READ, false); - bootstrap.handler(new H2Codec(owner, observer, opsFactory, acceptGzip, metricsRecorder, uriTagValue)); - return bootstrap.open(); - } - static final Pattern FOLLOW_REDIRECT_CODES = Pattern.compile("30[12378]"); static final BiPredicate FOLLOW_REDIRECT_PREDICATE = @@ -639,6 +681,12 @@ static Future openStream( static final int h11orH2C = h11 | h2c; + static final Http2StreamFrameToHttpObjectCodec HTTP2_STREAM_FRAME_TO_HTTP_OBJECT = + new Http2StreamFrameToHttpObjectCodec(false); + + static final Http2StreamBridgeClientHandler HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER = + new Http2StreamBridgeClientHandler(); + static final Logger log = Loggers.getLogger(HttpClientConfig.class); static final LoggingHandler LOGGING_HANDLER = @@ -706,6 +754,7 @@ public void handlerAdded(ChannelHandlerContext ctx) { } static final class H2Codec extends ChannelInitializer { + final boolean acceptGzip; final ChannelMetricsRecorder metricsRecorder; final ConnectionObserver observer; @@ -759,55 +808,14 @@ static final class H2Codec extends ChannelInitializer { protected void initChannel(Channel ch) { if (observer != null && opsFactory != null && owner != null) { Http2ConnectionProvider.registerClose(ch, owner); - addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory); + addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory, + acceptGzip, metricsRecorder, responseTimeoutMillis, uriTagValue); } else { // Handle server pushes (inbound streams) // TODO this is not supported } } - - void addStreamHandlers(Channel ch, ConnectionObserver obs, ChannelOperations.OnSetup opsFactory) { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(NettyPipeline.H2ToHttp11Codec, new Http2StreamFrameToHttpObjectCodec(false)) - .addLast(NettyPipeline.HttpTrafficHandler, new Http2StreamBridgeClientHandler(obs, opsFactory)); - - if (acceptGzip) { - pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor()); - } - - ChannelOperations.addReactiveBridge(ch, opsFactory, obs); - - if (metricsRecorder != null) { - if (metricsRecorder instanceof HttpClientMetricsRecorder) { - ChannelHandler handler; - Channel parent = ch.parent(); - ChannelHandler existingHandler = parent.pipeline().get(NettyPipeline.HttpMetricsHandler); - if (existingHandler != null) { - // This use case can happen only in HTTP/2 clear text connection upgrade - parent.pipeline().remove(NettyPipeline.HttpMetricsHandler); - handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ? - new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsHandler) existingHandler) : - new HttpClientMetricsHandler((HttpClientMetricsHandler) existingHandler); - } - else { - handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ? - new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, uriTagValue) : - new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, uriTagValue); - } - pipeline.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler); - } - } - - if (responseTimeoutMillis > -1) { - Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler, - new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS)); - } - - if (log.isDebugEnabled()) { - log.debug(format(ch, "Initialized HTTP/2 stream pipeline {}"), ch.pipeline()); - } - } } static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java index 6cda411430..9e05323be3 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java @@ -414,7 +414,7 @@ static void addStreamHandlers(Channel ch, if (accessLogEnabled) { pipeline.addLast(NettyPipeline.AccessLogHandler, AccessLogHandlerFactory.H2.create(accessLog)); } - pipeline.addLast(NettyPipeline.H2ToHttp11Codec, new Http2StreamFrameToHttpObjectCodec(true)) + pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT) .addLast(NettyPipeline.HttpTrafficHandler, new Http2StreamBridgeServerHandler(compressPredicate, decoder, encoder, formDecoderProvider, forwardedHeaderHandler, listener, mapHandle)); @@ -664,6 +664,9 @@ static void configureHttp11Pipeline(ChannelPipeline p, static final int h11orH2C = h11 | h2c; + static final Http2StreamFrameToHttpObjectCodec HTTP2_STREAM_FRAME_TO_HTTP_OBJECT = + new Http2StreamFrameToHttpObjectCodec(true); + static final Logger log = Loggers.getLogger(HttpServerConfig.class); static final LoggingHandler LOGGING_HANDLER = diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java index 22d6210df9..328715fbe0 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java @@ -407,7 +407,7 @@ private void doTestResponseTimeout(HttpClient client, long expectedTimeout) timeout.set(((ReadTimeoutHandler) handler).getReaderIdleTimeInMillis()); } }) - .doOnDisconnected(conn -> onDisconnected.set(handlerAvailable.test(conn))); + .doOnDisconnected(conn -> onDisconnected.set(conn.channel().isActive() && handlerAvailable.test(conn))); Mono response = localClient.get()