Skip to content

Commit

Permalink
Merge from master
Browse files Browse the repository at this point in the history
  • Loading branch information
Palmr committed Dec 14, 2022
1 parent 256e4f8 commit e5ea360
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 32 deletions.
9 changes: 5 additions & 4 deletions src/main/java/com/lmax/disruptor/BatchEventProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class BatchEventProcessor<T>
private static final int IDLE = 0;
private static final int HALTED = IDLE + 1;
private static final int RUNNING = HALTED + 1;
private static final int DEFAULT_MAX_BATCH_SIZE = Integer.MAX_VALUE;

private final AtomicInteger running = new AtomicInteger(IDLE);
private ExceptionHandler<? super T> exceptionHandler;
Expand Down Expand Up @@ -124,7 +125,7 @@ public BatchEventProcessor(
final EventHandler<? super T> eventHandler
)
{
this(dataProvider, sequenceBarrier, eventHandler, Integer.MAX_VALUE);
this(dataProvider, sequenceBarrier, eventHandler, DEFAULT_MAX_BATCH_SIZE);
}

/**
Expand All @@ -141,7 +142,7 @@ public BatchEventProcessor(
final RewindableEventHandler<? super T> rewindableEventHandler
)
{
this(dataProvider, sequenceBarrier, rewindableEventHandler, Integer.MAX_VALUE);
this(dataProvider, sequenceBarrier, rewindableEventHandler, DEFAULT_MAX_BATCH_SIZE);
}

@Override
Expand Down Expand Up @@ -251,9 +252,9 @@ private void processEvents()
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
final long endOfBatchSequence = min(nextSequence + batchLimitOffset, availableSequence);

if (endOfBatchSequence >= nextSequence)
if (nextSequence <= endOfBatchSequence)
{
eventHandler.onBatchStart(endOfBatchSequence - nextSequence + 1);
eventHandler.onBatchStart(endOfBatchSequence - nextSequence + 1, availableSequence - nextSequence + 1);
}

while (nextSequence <= endOfBatchSequence)
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/lmax/disruptor/EventHandlerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ interface EventHandlerBase<T>
* Invoked by {@link BatchEventProcessor} prior to processing a batch of events
*
* @param batchSize the size of the batch that is starting
* @param queueDepth the total number of queued up events including the batch about to be processed
*/
default void onBatchStart(long batchSize)
default void onBatchStart(long batchSize, long queueDepth)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void reset(final CountDownLatch latch, final long expectedCount)
}

@Override
public void onBatchStart(final long batchSize)
public void onBatchStart(final long batchSize, final long queueDepth)
{
batchesProcessed.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void reset(final CountDownLatch latch, final long expectedCount)
}

@Override
public void onBatchStart(final long batchSize)
public void onBatchStart(final long batchSize, final long queueDepth)
{
batchesProcessed.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void onEvent(final long[] event, final long sequence, final boolean endOf
}

@Override
public void onBatchStart(final long batchSize)
public void onBatchStart(final long batchSize, final long queueDepth)
{
batchesProcessed.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void onEvent(final ValueEvent event, final long sequence, final boolean e
}

@Override
public void onBatchStart(final long batchSize)
public void onBatchStart(final long batchSize, final long queueDepth)
{
batchesProcessed.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void onEvent(final ValueEvent event, final long sequence, final boolean e
}

@Override
public void onBatchStart(final long batchSize)
public void onBatchStart(final long batchSize, final long queueDepth)
{
batchesProcessed.increment();
}
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ final class LoopbackEventHandler
{

@Override
public void onBatchStart(final long batchSize)
public void onBatchStart(final long batchSize, final long queueDepth)
{
batchSizes.add(batchSize);
}
Expand Down Expand Up @@ -364,7 +364,7 @@ public void onEvent(final StubEvent event, final long sequence, final boolean en
}

@Override
public void onBatchStart(final long batchSize)
public void onBatchStart(final long batchSize, final long queueDepth)
{
final Integer currentCount = batchSizeToCountMap.get(batchSize);
final int nextCount = null == currentCount ? 1 : currentCount + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,33 +57,18 @@ void setUp()
@Test
public void shouldLimitTheBatchToConfiguredMaxBatchSize() throws Exception
{
long sequence = 0;
for (int i = 0; i < PUBLISH_COUNT; i++)
{
sequence = ringBuffer.next();
}
ringBuffer.publish(sequence);

//Wait for consumer to process all events
countDownLatch.await();
publishEvents();

assertEquals(eventHandler.batchedSequences, Arrays.asList(Arrays.asList(0L, 1L, 2L), Arrays.asList(3L, 4L)));
}

@Test
public void shouldAnnounceBatchSizeAtTheStartOfBatch() throws Exception
public void shouldAnnounceBatchSizeAndQueueDepthAtTheStartOfBatch() throws Exception
{
long sequence = 0;
for (int i = 0; i < PUBLISH_COUNT; i++)
{
sequence = ringBuffer.next();
}
ringBuffer.publish(sequence);

//Wait for consumer to process all events
countDownLatch.await();
publishEvents();

assertEquals(eventHandler.announcedBatchSizes, Arrays.asList(3L, 2L));
assertEquals(eventHandler.announcedQueueDepths, Arrays.asList(5L, 2L));
}

@AfterEach
Expand All @@ -93,12 +78,26 @@ void tearDown() throws InterruptedException
thread.join();
}

private void publishEvents() throws InterruptedException
{
long sequence = 0;
for (int i = 0; i < PUBLISH_COUNT; i++)
{
sequence = ringBuffer.next();
}
ringBuffer.publish(sequence);

//Wait for consumer to process all events
countDownLatch.await();
}

private static class BatchLimitRecordingHandler implements EventHandler<StubEvent>
{
public final List<List<Long>> batchedSequences = new ArrayList<>();
private List<Long> currentSequences;
private final CountDownLatch countDownLatch;
private final List<Long> announcedBatchSizes = new ArrayList<>();
private final List<Long> announcedQueueDepths = new ArrayList<>();

BatchLimitRecordingHandler(final CountDownLatch countDownLatch)
{
Expand All @@ -119,10 +118,11 @@ public void onEvent(final StubEvent event, final long sequence, final boolean en
}

@Override
public void onBatchStart(final long batchSize)
public void onBatchStart(final long batchSize, final long queueDepth)
{
currentSequences = new ArrayList<>();
announcedBatchSizes.add(batchSize);
announcedQueueDepths.add(queueDepth);
}
}
}

0 comments on commit e5ea360

Please sign in to comment.