diff --git a/lavender-brokers/lavender-broker-common/src/dev/shiza/lavender/broker/PacketBaseBroker.java b/lavender-brokers/lavender-broker-common/src/dev/shiza/lavender/broker/PacketBaseBroker.java new file mode 100644 index 0000000..25b4886 --- /dev/null +++ b/lavender-brokers/lavender-broker-common/src/dev/shiza/lavender/broker/PacketBaseBroker.java @@ -0,0 +1,19 @@ +package dev.shiza.lavender.broker; + +import dev.shiza.lavender.codec.Packet; +import org.jetbrains.annotations.ApiStatus.Internal; + +@Internal +public abstract class PacketBaseBroker implements PacketBroker { + + @Override + public void delegateToPacketBroker(final Packet request, final Packet response) + throws PacketOrchestrationException { + if (request.getTarget() == null) { + throw new PacketOrchestrationException( + "Could not delegate packet to packet broker due to missing target."); + } + + publish(request.getTarget(), response); + } +} diff --git a/lavender-brokers/lavender-broker-common/src/dev/shiza/lavender/broker/PacketOrchestrator.java b/lavender-brokers/lavender-broker-common/src/dev/shiza/lavender/broker/PacketOrchestrator.java index a3c27f0..8d6a075 100644 --- a/lavender-brokers/lavender-broker-common/src/dev/shiza/lavender/broker/PacketOrchestrator.java +++ b/lavender-brokers/lavender-broker-common/src/dev/shiza/lavender/broker/PacketOrchestrator.java @@ -1,24 +1,24 @@ package dev.shiza.lavender.broker; import dev.shiza.lavender.codec.Packet; -import java.util.function.BiConsumer; -import java.util.function.Consumer; public interface PacketOrchestrator { /** * Delegates packet to event bus. * - * @return consumer for packet delegation + * @param message message to be delegated * @throws PacketOrchestrationException if packet could not be delegated to event bus */ - Consumer delegateToEventBus() throws PacketOrchestrationException; + void delegateToEventBus(final T message) throws PacketOrchestrationException; /** * Delegates packet to packet broker. * - * @return consumer for packet delegation + * @param request packet sent as a request + * @param response packet to be sent as a response * @throws PacketOrchestrationException if packet could not be delegated to packet broker */ - BiConsumer delegateToPacketBroker() throws PacketOrchestrationException; + void delegateToPacketBroker(final Packet request, final Packet response) + throws PacketOrchestrationException; } diff --git a/lavender-brokers/lavender-broker-nats/src/dev/shiza/lavender/broker/nats/NatsPacketBrokerImpl.java b/lavender-brokers/lavender-broker-nats/src/dev/shiza/lavender/broker/nats/NatsPacketBrokerImpl.java index 65b1179..fd39289 100644 --- a/lavender-brokers/lavender-broker-nats/src/dev/shiza/lavender/broker/nats/NatsPacketBrokerImpl.java +++ b/lavender-brokers/lavender-broker-nats/src/dev/shiza/lavender/broker/nats/NatsPacketBrokerImpl.java @@ -2,11 +2,11 @@ import dev.shiza.dew.event.EventBus; import dev.shiza.dew.event.EventPublishingException; -import dev.shiza.dew.result.ResultHandler; import dev.shiza.dew.subscription.Subscriber; import dev.shiza.dew.subscription.SubscribingException; import dev.shiza.lavender.broker.ChannelObservingException; import dev.shiza.lavender.broker.ChannelPublishingException; +import dev.shiza.lavender.broker.PacketBaseBroker; import dev.shiza.lavender.broker.PacketBrokerClosingException; import dev.shiza.lavender.broker.PacketOrchestrationException; import dev.shiza.lavender.codec.Packet; @@ -15,10 +15,8 @@ import io.nats.client.Message; import java.time.Duration; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -final class NatsPacketBrokerImpl implements NatsPacketBroker { +final class NatsPacketBrokerImpl extends PacketBaseBroker implements NatsPacketBroker { private final String identity; private final PacketCodec packetCodec; @@ -36,10 +34,7 @@ final class NatsPacketBrokerImpl implements NatsPacketBroker { this.packetCodec = packetCodec; this.eventBus = eventBus; this.eventBus.publisher(Runnable::run); - this.eventBus.result( - Packet.class, - (ResultHandler) - (request, response) -> delegateToPacketBroker().accept(request, response)); + this.eventBus.result(Packet.class, this::delegateToPacketBroker); this.requestTtl = requestTtl; this.connection = connection; } @@ -74,36 +69,22 @@ public CompletableFuture request( } @Override - public Consumer delegateToEventBus() throws PacketOrchestrationException { - return message -> { - final Packet packet = packetCodec.decoder().decode(message.getData()); - if (packet.getSource().equals(identity)) { - return; - } - - if (message.getReplyTo() != null) { - packet.setTarget(message.getReplyTo()); - } - - try { - eventBus.publish(packet, message.getSubject()); - } catch (final EventPublishingException exception) { - throw new PacketOrchestrationException( - "Could not delegate packet to event bus due to unexpected exception.", exception); - } - }; - } + public void delegateToEventBus(final Message message) throws PacketOrchestrationException { + final Packet packet = packetCodec.decoder().decode(message.getData()); + if (packet.getSource().equals(identity)) { + return; + } - @Override - public BiConsumer delegateToPacketBroker() throws PacketOrchestrationException { - return (request, response) -> { - if (request.getTarget() == null) { - throw new PacketOrchestrationException( - "Could not delegate packet to packet broker due to missing target."); - } + if (message.getReplyTo() != null) { + packet.setTarget(message.getReplyTo()); + } - publish(request.getTarget(), response); - }; + try { + eventBus.publish(packet, message.getSubject()); + } catch (final EventPublishingException exception) { + throw new PacketOrchestrationException( + "Could not delegate packet to event bus due to unexpected exception.", exception); + } } @Override @@ -129,9 +110,7 @@ private void observeEventBus(final Subscriber subscriber) { private void observePacketBroker(final Subscriber subscriber) { try { - connection - .createDispatcher(message -> delegateToEventBus().accept(message)) - .subscribe(subscriber.identity()); + connection.createDispatcher(this::delegateToEventBus).subscribe(subscriber.identity()); } catch (final IllegalStateException exception) { throw new ChannelObservingException( "Could not create dispatcher on channel named %s due to unexpected exception."