Skip to content

Commit

Permalink
Merge pull request #1 from eperott/fix_buffer_overflow
Browse files Browse the repository at this point in the history
Force flush of nio buffer when threshold is reached
  • Loading branch information
lunarfs authored Sep 2, 2024
2 parents aa129bf + 7c738ed commit 7295fe8
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,32 +1,50 @@
package com.zegelin.prometheus.exposition;

import com.google.common.annotations.VisibleForTesting;

import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;

public class FormattedByteChannel implements ReadableByteChannel {
public static final int MIN_CHUNK_SIZE = 1024 * 1024;
public static final int MAX_CHUNK_SIZE = MIN_CHUNK_SIZE * 5;
public static final int DEFAULT_CHUNK_THRESHOLD = 1024 * 1024;
public static final int MAX_CHUNK_SIZE = DEFAULT_CHUNK_THRESHOLD * 5;

private final FormattedExposition formattedExposition;
private final int chunkThreshold;

public FormattedByteChannel(final FormattedExposition formattedExposition) {
this(formattedExposition, DEFAULT_CHUNK_THRESHOLD);
}

public FormattedByteChannel(FormattedExposition formattedExposition) {
@VisibleForTesting
FormattedByteChannel(final FormattedExposition formattedExposition, final int chunkThreshold) {
this.formattedExposition = formattedExposition;
this.chunkThreshold = chunkThreshold;
}

@Override
public int read(ByteBuffer dst) {
public int read(final ByteBuffer dst) {
if (!isOpen()) {
return -1;
}

// Forcing the calling ChunkedNioStream to flush the buffer
if (hasBufferReachedChunkThreshold(dst)) {
return -1;
}

final NioExpositionSink sink = new NioExpositionSink(dst);
while (sink.getIngestedByteCount() < MIN_CHUNK_SIZE && isOpen()) {
while (!hasBufferReachedChunkThreshold(dst) && isOpen()) {
formattedExposition.nextSlice(sink);
}

return sink.getIngestedByteCount();
}

private boolean hasBufferReachedChunkThreshold(final ByteBuffer dst) {
return dst.position() >= chunkThreshold;
}

@Override
public boolean isOpen() {
return !formattedExposition.isEndOfInput();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.zegelin.prometheus.exposition;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedNioStream;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
Expand All @@ -8,49 +12,102 @@
import java.nio.ByteBuffer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

public class TestFormattedByteChannel {
@Mock
private FormattedExposition formattedExposition;
private ChannelHandlerContext ctx;

private ChunkedNioStream chunkedNioStream;

private TenSliceExposition formattedExposition;

private ByteBuffer buffer;
private FormattedByteChannel channel;

@BeforeMethod
public void before() {
MockitoAnnotations.initMocks(this);

buffer = ByteBuffer.allocate(128);
channel = new FormattedByteChannel(formattedExposition);
formattedExposition = new TenSliceExposition();
channel = new FormattedByteChannel(formattedExposition, 64);

when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
chunkedNioStream = new ChunkedNioStream(channel, 128);
}

@Test
public void testClosed() {
when(formattedExposition.isEndOfInput()).thenReturn(true);
formattedExposition.setSlices(0);

assertThat(channel.read(buffer)).isEqualTo(-1);
assertThat(channel.isOpen()).isEqualTo(false);
}

@Test
public void testOpen() {
when(formattedExposition.isEndOfInput()).thenReturn(false);
formattedExposition.setSlices(1);

assertThat(channel.isOpen()).isEqualTo(true);
}

@Test
public void testOneChunk() {
when(formattedExposition.isEndOfInput()).thenReturn(false).thenReturn(false).thenReturn(true);
doAnswer(invocation -> {
NioExpositionSink sink = invocation.getArgument(0);
public void testOneSlice() throws Exception {
formattedExposition.setSlices(1);
ByteBuf byteBuf;

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(10);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true);
}

@Test
public void testTwoSlices() throws Exception {
formattedExposition.setSlices(2);
ByteBuf byteBuf;

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(20);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true);
}

@Test
public void testTwoChunks() throws Exception {
formattedExposition.setSlices(10);
ByteBuf byteBuf;

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(70);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(false);

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(30);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true);
}

// A dummy Exposition implementation that will generate a specific number of slices of size 10.
private static class TenSliceExposition implements FormattedExposition {
private int slices = 0;
private int currentSlice = 0;

private void setSlices(final int chunks) {
this.slices = chunks;
}

@Override
public void nextSlice(final ExpositionSink<?> sink) {
if (isEndOfInput()) {
return;
}

currentSlice++;
sink.writeAscii("abcdefghij");
return null;
}).when(formattedExposition).nextSlice(any(NioExpositionSink.class));
}

assertThat(channel.read(buffer)).isEqualTo(10);
assertThat(channel.isOpen()).isEqualTo(false);
@Override
public boolean isEndOfInput() {
return currentSlice >= slices;
}
}
}

0 comments on commit 7295fe8

Please sign in to comment.