From 26d17fa9319c5c980e7438aff079ded600aeaafe Mon Sep 17 00:00:00 2001 From: "Saai Syvendra (Github key)" Date: Thu, 24 Oct 2024 06:46:29 +0530 Subject: [PATCH] Replace use of deprecated DirectProcessor Signed-off-by: Saai Syvendra (Github key) --- .../mirror/grpc/listener/SharedTopicListener.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java index db81aa93cfd..b126b45be12 100644 --- a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java +++ b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java @@ -22,9 +22,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Exceptions; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; @RequiredArgsConstructor @@ -34,17 +33,17 @@ public abstract class SharedTopicListener implements TopicListener { protected final ListenerProperties listenerProperties; @Override - @SuppressWarnings("deprecation") public Flux listen(TopicMessageFilter filter) { - DirectProcessor overflowProcessor = DirectProcessor.create(); - FluxSink overflowSink = overflowProcessor.sink(); + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); + Flux overflowProcessor = sink.asFlux(); // moving publishOn from after onBackpressureBuffer to after Flux.merge reduces CPU usage by up to 40% Flux topicMessageFlux = getSharedListener(filter) .doOnSubscribe(s -> log.info("Subscribing: {}", filter)) .onBackpressureBuffer( - listenerProperties.getMaxBufferSize(), t -> overflowSink.error(Exceptions.failWithOverflow())) - .doFinally(s -> overflowSink.complete()); + listenerProperties.getMaxBufferSize(), t -> sink.tryEmitError(Exceptions.failWithOverflow())) + .doFinally(s -> sink.tryEmitComplete()); + return Flux.merge(listenerProperties.getPrefetch(), topicMessageFlux, overflowProcessor) .publishOn(Schedulers.boundedElastic(), false, listenerProperties.getPrefetch()); }