Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force flush of nio buffer when threshold is reached #1

Merged
merged 1 commit into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,32 +1,50 @@
package com.zegelin.prometheus.exposition;

import com.google.common.annotations.VisibleForTesting;

import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;

public class FormattedByteChannel implements ReadableByteChannel {
public static final int MIN_CHUNK_SIZE = 1024 * 1024;
public static final int MAX_CHUNK_SIZE = MIN_CHUNK_SIZE * 5;
public static final int DEFAULT_CHUNK_THRESHOLD = 1024 * 1024;
public static final int MAX_CHUNK_SIZE = DEFAULT_CHUNK_THRESHOLD * 5;

private final FormattedExposition formattedExposition;
private final int chunkThreshold;

public FormattedByteChannel(final FormattedExposition formattedExposition) {
this(formattedExposition, DEFAULT_CHUNK_THRESHOLD);
}

public FormattedByteChannel(FormattedExposition formattedExposition) {
@VisibleForTesting
FormattedByteChannel(final FormattedExposition formattedExposition, final int chunkThreshold) {
this.formattedExposition = formattedExposition;
this.chunkThreshold = chunkThreshold;
}

@Override
public int read(ByteBuffer dst) {
public int read(final ByteBuffer dst) {
if (!isOpen()) {
return -1;
}

// Forcing the calling ChunkedNioStream to flush the buffer
if (hasBufferReachedChunkThreshold(dst)) {
return -1;
}

final NioExpositionSink sink = new NioExpositionSink(dst);
while (sink.getIngestedByteCount() < MIN_CHUNK_SIZE && isOpen()) {
while (!hasBufferReachedChunkThreshold(dst) && isOpen()) {
formattedExposition.nextSlice(sink);
}

return sink.getIngestedByteCount();
}

private boolean hasBufferReachedChunkThreshold(final ByteBuffer dst) {
return dst.position() >= chunkThreshold;
}

@Override
public boolean isOpen() {
return !formattedExposition.isEndOfInput();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.zegelin.prometheus.exposition;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedNioStream;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
Expand All @@ -8,49 +12,102 @@
import java.nio.ByteBuffer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

public class TestFormattedByteChannel {
@Mock
private FormattedExposition formattedExposition;
private ChannelHandlerContext ctx;

private ChunkedNioStream chunkedNioStream;

private TenSliceExposition formattedExposition;

private ByteBuffer buffer;
private FormattedByteChannel channel;

@BeforeMethod
public void before() {
MockitoAnnotations.initMocks(this);

buffer = ByteBuffer.allocate(128);
channel = new FormattedByteChannel(formattedExposition);
formattedExposition = new TenSliceExposition();
channel = new FormattedByteChannel(formattedExposition, 64);

when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
chunkedNioStream = new ChunkedNioStream(channel, 128);
}

@Test
public void testClosed() {
when(formattedExposition.isEndOfInput()).thenReturn(true);
formattedExposition.setSlices(0);

assertThat(channel.read(buffer)).isEqualTo(-1);
assertThat(channel.isOpen()).isEqualTo(false);
}

@Test
public void testOpen() {
when(formattedExposition.isEndOfInput()).thenReturn(false);
formattedExposition.setSlices(1);

assertThat(channel.isOpen()).isEqualTo(true);
}

@Test
public void testOneChunk() {
when(formattedExposition.isEndOfInput()).thenReturn(false).thenReturn(false).thenReturn(true);
doAnswer(invocation -> {
NioExpositionSink sink = invocation.getArgument(0);
public void testOneSlice() throws Exception {
formattedExposition.setSlices(1);
ByteBuf byteBuf;

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(10);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true);
}

@Test
public void testTwoSlices() throws Exception {
formattedExposition.setSlices(2);
ByteBuf byteBuf;

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(20);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true);
}

@Test
public void testTwoChunks() throws Exception {
formattedExposition.setSlices(10);
ByteBuf byteBuf;

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(70);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(false);

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(30);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true);
}

// A dummy Exposition implementation that will generate a specific number of slices of size 10.
private static class TenSliceExposition implements FormattedExposition {
private int slices = 0;
private int currentSlice = 0;

private void setSlices(final int chunks) {
this.slices = chunks;
}

@Override
public void nextSlice(final ExpositionSink<?> sink) {
if (isEndOfInput()) {
return;
}

currentSlice++;
sink.writeAscii("abcdefghij");
return null;
}).when(formattedExposition).nextSlice(any(NioExpositionSink.class));
}

assertThat(channel.read(buffer)).isEqualTo(10);
assertThat(channel.isOpen()).isEqualTo(false);
@Override
public boolean isEndOfInput() {
return currentSlice >= slices;
}
}
}