From 9f6d49aef4962bcffb817a2d647a76c4d1b5a147 Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 7 Nov 2023 12:07:27 +0100 Subject: [PATCH 1/3] Implemented flow limiter handler to explicitly pull byte buffers --- .../logstash/beats/FlowLimiterHandler.java | 57 +++++ src/main/java/org/logstash/beats/Server.java | 1 + .../beats/FlowLimiterHandlerTest.java | 197 ++++++++++++++++++ 3 files changed, 255 insertions(+) create mode 100644 src/main/java/org/logstash/beats/FlowLimiterHandler.java create mode 100644 src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java diff --git a/src/main/java/org/logstash/beats/FlowLimiterHandler.java b/src/main/java/org/logstash/beats/FlowLimiterHandler.java new file mode 100644 index 00000000..1ac48d5b --- /dev/null +++ b/src/main/java/org/logstash/beats/FlowLimiterHandler.java @@ -0,0 +1,57 @@ +package org.logstash.beats; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Configure the channel where it's installed to operate the reads in pull mode, + * disabling the autoread and explicitly invoking the read operation. + * The flow control to keep the outgoing buffer under control is done + * avoiding to read in new bytes if the outgoing direction became not writable, this + * excert back pressure to the TCP layer and ultimately to the upstream system. + * */ +@Sharable +public final class FlowLimiterHandler extends ChannelInboundHandlerAdapter { + + private final static Logger logger = LogManager.getLogger(FlowLimiterHandler.class); + + @Override + public void channelRegistered(final ChannelHandlerContext ctx) throws Exception { + ctx.channel().config().setAutoRead(false); + super.channelRegistered(ctx); + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + if (isAutoreadDisabled(ctx.channel()) && ctx.channel().isWritable()) { + ctx.channel().read(); + } + } + + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + if (isAutoreadDisabled(ctx.channel()) && ctx.channel().isWritable()) { + ctx.channel().read(); + } + } + + private boolean isAutoreadDisabled(Channel channel) { + return !channel.config().isAutoRead(); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + ctx.channel().read(); + super.channelWritabilityChanged(ctx); + + logger.debug("Writability on channel {} changed to {}", ctx.channel(), ctx.channel().isWritable()); + } + +} + diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index c343aaf6..ab55f1ba 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -134,6 +134,7 @@ public void initChannel(SocketChannel socket){ new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds)); pipeline.addLast(BEATS_ACKER, new AckEncoder()); pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); + pipeline.addLast(new FlowLimiterHandler()); pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener)); } diff --git a/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java new file mode 100644 index 00000000..d37370fa --- /dev/null +++ b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java @@ -0,0 +1,197 @@ +package org.logstash.beats; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public class FlowLimiterHandlerTest { + + private ReadMessagesCollector readMessagesCollector; + + private static ByteBuf prepareSample(int numBytes) { + return prepareSample(numBytes, 'A'); + } + + private static ByteBuf prepareSample(int numBytes, char c) { + ByteBuf payload = PooledByteBufAllocator.DEFAULT.directBuffer(numBytes); + for (int i = 0; i < numBytes; i++) { + payload.writeByte(c); + } + return payload; + } + + private ChannelInboundHandlerAdapter onClientConnected(Consumer action) { + return new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + action.accept(ctx); + } + }; + } + + private static class ReadMessagesCollector extends SimpleChannelInboundHandler { + private Channel clientChannel; + private final NioEventLoopGroup group; + boolean firstChunkRead = false; + + ReadMessagesCollector(NioEventLoopGroup group) { + this.group = group; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + if (!firstChunkRead) { + assertEquals("Expect to read a first chunk and no others", 32, msg.readableBytes()); + firstChunkRead = true; + + // client write other data that MUSTN'T be read by the server, because + // is rate limited. + clientChannel.writeAndFlush(prepareSample(16)).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // on successful flush schedule a shutdown + ctx.channel().eventLoop().schedule(new Runnable() { + @Override + public void run() { + group.shutdownGracefully(); + } + }, 2, TimeUnit.SECONDS); + } else { + ctx.fireExceptionCaught(future.cause()); + } + } + }); + + } else { + // the first read happened, no other reads are commanded by the server + // should never pass here + fail("Shouldn't never be notified other data while in rate limiting"); + } + } + + public void updateClient(Channel clientChannel) { + assertNotNull(clientChannel); + this.clientChannel = clientChannel; + } + } + + + private static class AssertionsHandler extends ChannelInboundHandlerAdapter { + + private final NioEventLoopGroup group; + + private Throwable lastError; + + public AssertionsHandler(NioEventLoopGroup group) { + this.group = group; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + lastError = cause; + group.shutdownGracefully(); + } + + public void assertNoErrors() { + if (lastError != null) { + if (lastError instanceof AssertionError) { + throw (AssertionError) lastError; + } else { + fail("Failed with error" + lastError); + } + } + } + } + + @Test + public void givenAChannelInNotWriteableStateWhenNewBuffersAreSentByClientThenNoDecodeTakePartOnServerSide() throws Exception { + final int highWaterMark = 32 * 1024; + FlowLimiterHandler sut = new FlowLimiterHandler(); + + NioEventLoopGroup group = new NioEventLoopGroup(); + ServerBootstrap b = new ServerBootstrap(); + + readMessagesCollector = new ReadMessagesCollector(group); + AssertionsHandler assertionsHandler = new AssertionsHandler(group); + try { + b.group(group) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.config().setWriteBufferHighWaterMark(highWaterMark); + ch.pipeline() + .addLast(onClientConnected(ctx -> { + // write as much to move the channel in not writable state + fillOutboundWatermark(ctx, highWaterMark); + // ask the client to send some data present on the channel + clientChannel.writeAndFlush(prepareSample(32)); + })) + .addLast(sut) + .addLast(readMessagesCollector) + .addLast(assertionsHandler); + } + }); + ChannelFuture future = b.bind("0.0.0.0", 1234).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + startAClient(group); + } + } + }).sync(); + future.channel().closeFuture().sync(); + } finally { + group.shutdownGracefully().sync(); + } + + assertionsHandler.assertNoErrors(); + } + + private static void fillOutboundWatermark(ChannelHandlerContext ctx, int highWaterMark) { + final ByteBuf payload = prepareSample(highWaterMark, 'C'); + while (ctx.channel().isWritable()) { + ctx.pipeline().writeAndFlush(payload.copy()); + } + } + + Channel clientChannel; + + private void startAClient(NioEventLoopGroup group) { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.config().setAutoRead(false); + clientChannel = ch; + readMessagesCollector.updateClient(clientChannel); + } + }); + b.connect("localhost", 1234); + } + +} From c707a85c14ada0260719ce4c8f30caad37c8bbb7 Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 7 Nov 2023 12:29:48 +0100 Subject: [PATCH 2/3] Fixed potential buffer leak --- src/main/java/org/logstash/beats/BeatsParser.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index 812150b1..5d02559e 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -199,9 +199,17 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } case READ_JSON: { logger.trace("Running: READ_JSON"); - ((V2Batch)batch).addMessage(sequence, in, requiredBytes); - if(batch.isComplete()) { - if(logger.isTraceEnabled()) { + try { + ((V2Batch) batch).addMessage(sequence, in, requiredBytes); + } catch (Throwable th) { + // batch has to release its internal buffer before bubbling up the exception + batch.release(); + + // re throw the same error after released the internal buffer + throw th; + } + if (batch.isComplete()) { + if (logger.isTraceEnabled()) { logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence); } out.add(batch); From 792f8680ef0a782c5ad39c9ff5864b480c17078c Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 7 Nov 2023 15:45:12 +0100 Subject: [PATCH 3/3] Moved the flow limiter handler as first element of the Netty chain --- src/main/java/org/logstash/beats/Server.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index ab55f1ba..15d66acd 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -127,6 +127,7 @@ private class BeatsInitializer extends ChannelInitializer { public void initChannel(SocketChannel socket){ ChannelPipeline pipeline = socket.pipeline(); + pipeline.addLast(new FlowLimiterHandler()); if (isSslEnabled()) { pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket)); } @@ -134,7 +135,6 @@ public void initChannel(SocketChannel socket){ new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds)); pipeline.addLast(BEATS_ACKER, new AckEncoder()); pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); - pipeline.addLast(new FlowLimiterHandler()); pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener)); }