Skip to content

Commit

Permalink
code review: rename but to eventBus in (parent) HandlerRegistration; …
Browse files Browse the repository at this point in the history
…better inheritance; confusing method name
  • Loading branch information
magicprinc committed Sep 15, 2023
1 parent 658d643 commit 4992783
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 36 deletions.
50 changes: 26 additions & 24 deletions src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,26 @@
public abstract class HandlerRegistration<T> implements Closeable {

public final ContextInternal context;
public final EventBusImpl bus;
public final EventBusImpl eventBus;
public final String address;
public final boolean src;
private HandlerHolder<T> registered;
private Object metric;
Object metric;

HandlerRegistration(ContextInternal context,
EventBusImpl bus,
String address,
boolean src) {
HandlerRegistration(ContextInternal context, EventBusImpl bus, String address, boolean src) {
this.context = context;
this.bus = bus;
this.eventBus = bus;
this.src = src;
this.address = address;
}

void receive(MessageImpl msg) {
if (bus.metrics != null) {
bus.metrics.scheduleMessage(metric, msg.isLocal());
if (eventBus.metrics != null) {
eventBus.metrics.scheduleMessage(metric, msg.isLocal());
}
context.executor().execute(() -> {
// Need to check handler is still there - the handler might have been removed after the message were sent but
// before it was received
// Need to check handler is still there:
// the handler might have been removed AFTER the message was sent BUT BEFORE it was received
if (!doReceive(msg)) {
discard(msg);
}
Expand All @@ -58,11 +55,11 @@ void receive(MessageImpl msg) {

synchronized void register(String repliedAddress, boolean localOnly, Promise<Void> promise) {
if (registered != null) {
throw new IllegalStateException();
throw new IllegalStateException("already registered: "+repliedAddress+" - "+registered);
}
registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, promise);
if (bus.metrics != null) {
metric = bus.metrics.handlerRegistered(address, repliedAddress);
registered = eventBus.addRegistration(address, this, repliedAddress != null, localOnly, promise);
if (eventBus.metrics != null) {
metric = eventBus.metrics.handlerRegistered(address, repliedAddress);
}
}

Expand All @@ -74,10 +71,10 @@ public Future<Void> unregister() {
Promise<Void> promise = context.promise();
synchronized (this) {
if (registered != null) {
bus.removeRegistration(registered, promise);
eventBus.removeRegistration(registered, promise);
registered = null;
if (bus.metrics != null) {
bus.metrics.handlerUnregistered(metric);
if (eventBus.metrics != null) {
eventBus.metrics.handlerUnregistered(metric);
}
} else {
promise.complete();
Expand All @@ -86,14 +83,14 @@ public Future<Void> unregister() {
return promise.future();
}

void dispatch(Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
final void dispatchUsingInboundDeliveryContext (Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl<?, T>) message, theHandler, context);
deliveryCtx.dispatch();
}

void discard(Message<T> msg) {
if (bus.metrics != null) {
bus.metrics.discardMessage(metric, ((MessageImpl)msg).isLocal(), msg);
if (eventBus.metrics != null) {
eventBus.metrics.discardMessage(metric, ((MessageImpl)msg).isLocal(), msg);
}

String replyAddress = msg.replyAddress();
Expand All @@ -102,7 +99,7 @@ void discard(Message<T> msg) {
}
}

private class InboundDeliveryContext extends DeliveryContextBase<T> {
private final class InboundDeliveryContext extends DeliveryContextBase<T> {

private final Handler<Message<T>> handler;

Expand All @@ -112,12 +109,13 @@ private InboundDeliveryContext(MessageImpl<?, T> message, Handler<Message<T>> ha
this.handler = handler;
}

@Override
protected void execute() {
ContextInternal ctx = InboundDeliveryContext.super.context;
Object m = metric;
VertxTracer tracer = ctx.tracer();
if (bus.metrics != null) {
bus.metrics.messageDelivered(m, message.isLocal());
if (eventBus.metrics != null) {
eventBus.metrics.messageDelivered(m, message.isLocal());
}
if (tracer != null && !src) {
message.trace = tracer.receiveRequest(ctx, SpanKind.RPC, TracingPolicy.PROPAGATE, message, message.isSend() ? "send" : "publish", message.headers(), MessageTagExtractor.INSTANCE);
Expand Down Expand Up @@ -146,4 +144,8 @@ public Object body() {
public void close(Promise<Void> completion) {
unregister().onComplete(completion);
}

public final Vertx vertx() {
return eventBus.vertx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class MessageConsumerImpl<T> extends HandlerRegistration<T> implements Me

private static final Logger log = LoggerFactory.getLogger(MessageConsumerImpl.class);

private static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
public static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;

private final boolean localOnly;
private Handler<Message<T>> handler;
Expand Down Expand Up @@ -124,7 +124,7 @@ public synchronized Future<Void> unregister() {
registered = false;
Promise<Void> res = result; // Alias reference because result can become null when the onComplete callback executes
fut.onComplete(ar -> res.tryFail("Consumer unregistered before registration completed"));
result = context.promise();
result = context.promise();// old result is-Complete or will be-Complete shortly
}
return fut;
}
Expand Down Expand Up @@ -173,7 +173,7 @@ protected void dispatch(Message<T> msg, ContextInternal context, Handler<Message
private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
// Handle the message outside the sync block
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714
dispatch(theHandler, message, context.duplicate());
dispatchUsingInboundDeliveryContext(theHandler, message, context.duplicate());
checkNextTick();
}

Expand Down Expand Up @@ -247,9 +247,7 @@ public MessageConsumer<T> resume() {

@Override
public synchronized MessageConsumer<T> fetch(long amount) {
if (amount < 0) {
throw new IllegalArgumentException();
}
Arguments.require(amount >= 0, "fetch(amount) must be positive, but: "+amount);
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
Expand All @@ -264,7 +262,7 @@ public synchronized MessageConsumer<T> fetch(long amount) {
public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
if (endHandler != null) {
// We should use the HandlerHolder context to properly do this (needs small refactoring)
Context endCtx = bus.vertx.getOrCreateContext();
Context endCtx = vertx().getOrCreateContext();
this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null));
} else {
this.endHandler = null;
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Future<Message<T>> result() {
}

void fail(ReplyException failure) {
if (bus.vertx.cancelTimer(timeoutID)) {
if (vertx().cancelTimer(timeoutID)) {
unregister();
doFail(failure);
}
Expand All @@ -59,8 +59,8 @@ void fail(ReplyException failure) {
private void doFail(ReplyException failure) {
trace(null, failure);
result.fail(failure);
if (bus.metrics != null) {
bus.metrics.replyFailure(repliedAddress, failure.failureType());
if (eventBus.metrics != null) {
eventBus.metrics.replyFailure(repliedAddress, failure.failureType());
}
}

Expand All @@ -72,7 +72,7 @@ public void handle(Long id) {

@Override
protected boolean doReceive(Message<T> reply) {
dispatch(null, reply, context);
dispatchUsingInboundDeliveryContext(null, reply, context);
return true;
}

Expand All @@ -82,7 +82,7 @@ void register() {

@Override
protected void dispatch(Message<T> reply, ContextInternal context, Handler<Message<T>> handler /* null */) {
if (bus.vertx.cancelTimer(timeoutID)) {
if (vertx().cancelTimer(timeoutID)) {
unregister();
if (reply.body() instanceof ReplyException) {
doFail((ReplyException) reply.body());
Expand Down

0 comments on commit 4992783

Please sign in to comment.