Skip to content

Commit

Permalink
Replace use of deprecated DirectProcessor
Browse files Browse the repository at this point in the history
Signed-off-by: Saai Syvendra (Github key) <[email protected]>
  • Loading branch information
saai-syvendra committed Oct 31, 2024
1 parent f2c78f3 commit 722cb6c
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,11 @@ public abstract class SharedTopicListener implements TopicListener {
@Override
public Flux<TopicMessage> listen(TopicMessageFilter filter) {
Sinks.Many<TopicMessage> sink = Sinks.many().unicast().onBackpressureBuffer();
Flux<TopicMessage> overflowProcessor = sink.asFlux();

// moving publishOn from after onBackpressureBuffer to after Flux.merge reduces CPU usage by up to 40%
Flux<TopicMessage> topicMessageFlux = getSharedListener(filter)
return getSharedListener(filter)
.doOnSubscribe(s -> log.info("Subscribing: {}", filter))
.onBackpressureBuffer(
listenerProperties.getMaxBufferSize(), t -> sink.tryEmitError(Exceptions.failWithOverflow()))
.doFinally(s -> sink.tryEmitComplete());

return Flux.merge(listenerProperties.getPrefetch(), topicMessageFlux, overflowProcessor)
.onBackpressureBuffer(listenerProperties.getMaxBufferSize(), BufferOverflowStrategy.ERROR)
.doFinally(s -> sink.tryEmitComplete())
.publishOn(Schedulers.boundedElastic(), false, listenerProperties.getPrefetch());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void slowSubscriberOverflowException() {
.expectNext(1L, 2L)
.thenAwait(Duration.ofMillis(500L)) // stall to overrun backpressure buffer
.thenRequest(Long.MAX_VALUE)
.thenConsumeWhile(n -> n < numMessages)
.expectNextCount(maxBufferSize)
.expectErrorMatches(Exceptions::isOverflow)
.verify(Duration.ofMillis(1000L));

Expand Down

0 comments on commit 722cb6c

Please sign in to comment.