Skip to content

Commit

Permalink
Fix code style inconsistencies
Browse files Browse the repository at this point in the history
  • Loading branch information
rchomczyk committed Oct 28, 2024
1 parent 2581849 commit 206bb5c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package dev.shiza.lavender.broker;

import dev.shiza.lavender.codec.Packet;
import org.jetbrains.annotations.ApiStatus.Internal;

@Internal
public abstract class PacketBaseBroker<T> implements PacketBroker<T> {

@Override
public void delegateToPacketBroker(final Packet request, final Packet response)
throws PacketOrchestrationException {
if (request.getTarget() == null) {
throw new PacketOrchestrationException(
"Could not delegate packet to packet broker due to missing target.");
}

publish(request.getTarget(), response);
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package dev.shiza.lavender.broker;

import dev.shiza.lavender.codec.Packet;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public interface PacketOrchestrator<T> {

/**
* Delegates packet to event bus.
*
* @return consumer for packet delegation
* @param message message to be delegated
* @throws PacketOrchestrationException if packet could not be delegated to event bus
*/
Consumer<T> delegateToEventBus() throws PacketOrchestrationException;
void delegateToEventBus(final T message) throws PacketOrchestrationException;

/**
* Delegates packet to packet broker.
*
* @return consumer for packet delegation
* @param request packet sent as a request
* @param response packet to be sent as a response
* @throws PacketOrchestrationException if packet could not be delegated to packet broker
*/
BiConsumer<Packet, Packet> delegateToPacketBroker() throws PacketOrchestrationException;
void delegateToPacketBroker(final Packet request, final Packet response)
throws PacketOrchestrationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import dev.shiza.dew.event.EventBus;
import dev.shiza.dew.event.EventPublishingException;
import dev.shiza.dew.result.ResultHandler;
import dev.shiza.dew.subscription.Subscriber;
import dev.shiza.dew.subscription.SubscribingException;
import dev.shiza.lavender.broker.ChannelObservingException;
import dev.shiza.lavender.broker.ChannelPublishingException;
import dev.shiza.lavender.broker.PacketBaseBroker;
import dev.shiza.lavender.broker.PacketBrokerClosingException;
import dev.shiza.lavender.broker.PacketOrchestrationException;
import dev.shiza.lavender.codec.Packet;
Expand All @@ -15,10 +15,8 @@
import io.nats.client.Message;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

final class NatsPacketBrokerImpl implements NatsPacketBroker {
final class NatsPacketBrokerImpl extends PacketBaseBroker<Message> implements NatsPacketBroker {

private final String identity;
private final PacketCodec packetCodec;
Expand All @@ -36,10 +34,7 @@ final class NatsPacketBrokerImpl implements NatsPacketBroker {
this.packetCodec = packetCodec;
this.eventBus = eventBus;
this.eventBus.publisher(Runnable::run);
this.eventBus.result(
Packet.class,
(ResultHandler<Packet, Packet>)
(request, response) -> delegateToPacketBroker().accept(request, response));
this.eventBus.result(Packet.class, this::delegateToPacketBroker);
this.requestTtl = requestTtl;
this.connection = connection;
}
Expand Down Expand Up @@ -74,36 +69,22 @@ public <T extends Packet> CompletableFuture<T> request(
}

@Override
public Consumer<Message> delegateToEventBus() throws PacketOrchestrationException {
return message -> {
final Packet packet = packetCodec.decoder().decode(message.getData());
if (packet.getSource().equals(identity)) {
return;
}

if (message.getReplyTo() != null) {
packet.setTarget(message.getReplyTo());
}

try {
eventBus.publish(packet, message.getSubject());
} catch (final EventPublishingException exception) {
throw new PacketOrchestrationException(
"Could not delegate packet to event bus due to unexpected exception.", exception);
}
};
}
public void delegateToEventBus(final Message message) throws PacketOrchestrationException {
final Packet packet = packetCodec.decoder().decode(message.getData());
if (packet.getSource().equals(identity)) {
return;
}

@Override
public BiConsumer<Packet, Packet> delegateToPacketBroker() throws PacketOrchestrationException {
return (request, response) -> {
if (request.getTarget() == null) {
throw new PacketOrchestrationException(
"Could not delegate packet to packet broker due to missing target.");
}
if (message.getReplyTo() != null) {
packet.setTarget(message.getReplyTo());
}

publish(request.getTarget(), response);
};
try {
eventBus.publish(packet, message.getSubject());
} catch (final EventPublishingException exception) {
throw new PacketOrchestrationException(
"Could not delegate packet to event bus due to unexpected exception.", exception);
}
}

@Override
Expand All @@ -129,9 +110,7 @@ private void observeEventBus(final Subscriber subscriber) {

private void observePacketBroker(final Subscriber subscriber) {
try {
connection
.createDispatcher(message -> delegateToEventBus().accept(message))
.subscribe(subscriber.identity());
connection.createDispatcher(this::delegateToEventBus).subscribe(subscriber.identity());
} catch (final IllegalStateException exception) {
throw new ChannelObservingException(
"Could not create dispatcher on channel named %s due to unexpected exception."
Expand Down

0 comments on commit 206bb5c

Please sign in to comment.