diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java index a1c15ea6c6d..00f20da8cae 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java @@ -23,29 +23,26 @@ public abstract class HandlerRegistration implements Closeable { public final ContextInternal context; - public final EventBusImpl bus; + public final EventBusImpl eventBus; public final String address; public final boolean src; private HandlerHolder registered; - private Object metric; + Object metric; - HandlerRegistration(ContextInternal context, - EventBusImpl bus, - String address, - boolean src) { + HandlerRegistration(ContextInternal context, EventBusImpl bus, String address, boolean src) { this.context = context; - this.bus = bus; + this.eventBus = bus; this.src = src; this.address = address; } void receive(MessageImpl msg) { - if (bus.metrics != null) { - bus.metrics.scheduleMessage(metric, msg.isLocal()); + if (eventBus.metrics != null) { + eventBus.metrics.scheduleMessage(metric, msg.isLocal()); } context.executor().execute(() -> { - // Need to check handler is still there - the handler might have been removed after the message were sent but - // before it was received + // Need to check handler is still there: + // the handler might have been removed AFTER the message was sent BUT BEFORE it was received if (!doReceive(msg)) { discard(msg); } @@ -58,11 +55,11 @@ void receive(MessageImpl msg) { synchronized void register(String repliedAddress, boolean localOnly, Promise promise) { if (registered != null) { - throw new IllegalStateException(); + throw new IllegalStateException("already registered: "+repliedAddress+" - "+registered); } - registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, promise); - if (bus.metrics != null) { - metric = bus.metrics.handlerRegistered(address, repliedAddress); + registered = eventBus.addRegistration(address, this, repliedAddress != null, localOnly, promise); + if (eventBus.metrics != null) { + metric = eventBus.metrics.handlerRegistered(address, repliedAddress); } } @@ -74,10 +71,10 @@ public Future unregister() { Promise promise = context.promise(); synchronized (this) { if (registered != null) { - bus.removeRegistration(registered, promise); + eventBus.removeRegistration(registered, promise); registered = null; - if (bus.metrics != null) { - bus.metrics.handlerUnregistered(metric); + if (eventBus.metrics != null) { + eventBus.metrics.handlerUnregistered(metric); } } else { promise.complete(); @@ -86,14 +83,14 @@ public Future unregister() { return promise.future(); } - void dispatch(Handler> theHandler, Message message, ContextInternal context) { + final void dispatchUsingInboundDeliveryContext (Handler> theHandler, Message message, ContextInternal context) { InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl) message, theHandler, context); deliveryCtx.dispatch(); } void discard(Message msg) { - if (bus.metrics != null) { - bus.metrics.discardMessage(metric, ((MessageImpl)msg).isLocal(), msg); + if (eventBus.metrics != null) { + eventBus.metrics.discardMessage(metric, ((MessageImpl)msg).isLocal(), msg); } String replyAddress = msg.replyAddress(); @@ -102,7 +99,7 @@ void discard(Message msg) { } } - private class InboundDeliveryContext extends DeliveryContextBase { + private final class InboundDeliveryContext extends DeliveryContextBase { private final Handler> handler; @@ -112,12 +109,13 @@ private InboundDeliveryContext(MessageImpl message, Handler> ha this.handler = handler; } + @Override protected void execute() { ContextInternal ctx = InboundDeliveryContext.super.context; Object m = metric; VertxTracer tracer = ctx.tracer(); - if (bus.metrics != null) { - bus.metrics.messageDelivered(m, message.isLocal()); + if (eventBus.metrics != null) { + eventBus.metrics.messageDelivered(m, message.isLocal()); } if (tracer != null && !src) { message.trace = tracer.receiveRequest(ctx, SpanKind.RPC, TracingPolicy.PROPAGATE, message, message.isSend() ? "send" : "publish", message.headers(), MessageTagExtractor.INSTANCE); @@ -146,4 +144,8 @@ public Object body() { public void close(Promise completion) { unregister().onComplete(completion); } + + public final Vertx vertx() { + return eventBus.vertx; + } } diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java index 843ad9cd9a4..ffac3c06ba3 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java @@ -35,7 +35,7 @@ public class MessageConsumerImpl extends HandlerRegistration implements Me private static final Logger log = LoggerFactory.getLogger(MessageConsumerImpl.class); - private static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000; + public static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000; private final boolean localOnly; private Handler> handler; @@ -124,7 +124,7 @@ public synchronized Future unregister() { registered = false; Promise res = result; // Alias reference because result can become null when the onComplete callback executes fut.onComplete(ar -> res.tryFail("Consumer unregistered before registration completed")); - result = context.promise(); + result = context.promise();// old result is-Complete or will be-Complete shortly } return fut; } @@ -173,7 +173,7 @@ protected void dispatch(Message msg, ContextInternal context, Handler> theHandler, Message message) { // Handle the message outside the sync block // https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714 - dispatch(theHandler, message, context.duplicate()); + dispatchUsingInboundDeliveryContext(theHandler, message, context.duplicate()); checkNextTick(); } @@ -247,9 +247,7 @@ public MessageConsumer resume() { @Override public synchronized MessageConsumer fetch(long amount) { - if (amount < 0) { - throw new IllegalArgumentException(); - } + Arguments.require(amount >= 0, "fetch(amount) must be positive, but: "+amount); demand += amount; if (demand < 0L) { demand = Long.MAX_VALUE; @@ -264,7 +262,7 @@ public synchronized MessageConsumer fetch(long amount) { public synchronized MessageConsumer endHandler(Handler endHandler) { if (endHandler != null) { // We should use the HandlerHolder context to properly do this (needs small refactoring) - Context endCtx = bus.vertx.getOrCreateContext(); + Context endCtx = vertx().getOrCreateContext(); this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null)); } else { this.endHandler = null; diff --git a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java index 6286348f867..2fdc37dfdb9 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java +++ b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java @@ -50,7 +50,7 @@ Future> result() { } void fail(ReplyException failure) { - if (bus.vertx.cancelTimer(timeoutID)) { + if (vertx().cancelTimer(timeoutID)) { unregister(); doFail(failure); } @@ -59,8 +59,8 @@ void fail(ReplyException failure) { private void doFail(ReplyException failure) { trace(null, failure); result.fail(failure); - if (bus.metrics != null) { - bus.metrics.replyFailure(repliedAddress, failure.failureType()); + if (eventBus.metrics != null) { + eventBus.metrics.replyFailure(repliedAddress, failure.failureType()); } } @@ -72,7 +72,7 @@ public void handle(Long id) { @Override protected boolean doReceive(Message reply) { - dispatch(null, reply, context); + dispatchUsingInboundDeliveryContext(null, reply, context); return true; } @@ -82,7 +82,7 @@ void register() { @Override protected void dispatch(Message reply, ContextInternal context, Handler> handler /* null */) { - if (bus.vertx.cancelTimer(timeoutID)) { + if (vertx().cancelTimer(timeoutID)) { unregister(); if (reply.body() instanceof ReplyException) { doFail((ReplyException) reply.body());