diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index bdbe44128d7..d9194e559b2 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -258,9 +258,9 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers, return msg; } - protected Consumer> addRegistration(String address, HandlerRegistration registration, boolean replyHandler, boolean localOnly, Promise promise) { + protected Consumer> addRegistration(String address, HandlerRegistration registration, boolean broadcast, boolean localOnly, Promise promise) { HandlerHolder holder = addLocalRegistration(address, registration, localOnly); - if (!replyHandler) { + if (broadcast) { onLocalRegistration(holder, promise); } else { if (promise != null) { @@ -268,7 +268,7 @@ protected Consumer> addRegistration(String address, HandlerReg } } return p -> { - removeRegistration(holder, replyHandler, p); + removeRegistration(holder, broadcast, p); }; } @@ -303,9 +303,9 @@ protected HandlerHolder createHandlerHolder(HandlerRegistration regist return new HandlerHolder<>(registration, localOnly, context); } - protected void removeRegistration(HandlerHolder handlerHolder, boolean replyHandler, Promise promise) { + protected void removeRegistration(HandlerHolder handlerHolder, boolean broadcast, Promise promise) { removeLocalRegistration(handlerHolder); - if (!replyHandler) { + if (broadcast) { onLocalUnregistration(handlerHolder, promise); } else { promise.complete(); diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java index 81da3106d93..5727cc59a47 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java @@ -58,13 +58,13 @@ void receive(MessageImpl msg) { protected abstract void dispatch(Message msg, ContextInternal context, Handler> handler); - synchronized void register(String repliedAddress, boolean localOnly, Promise promise) { + synchronized void register(boolean broadcast, boolean localOnly, Promise promise) { if (registered != null) { throw new IllegalStateException(); } - registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, promise); + registered = bus.addRegistration(address, this, broadcast, localOnly, promise); if (bus.metrics != null) { - metric = bus.metrics.handlerRegistered(address, repliedAddress); + metric = bus.metrics.handlerRegistered(address, null /* regression */); } } diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java index 776cbe0800d..0c9d0429ca5 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java @@ -216,7 +216,7 @@ public synchronized MessageConsumer handler(Handler> h) { registered = true; Promise p = result; Promise registration = context.promise(); - register(null, localOnly, registration); + register(true, localOnly, registration); registration.future().onComplete(ar -> { if (ar.succeeded()) { p.tryComplete(); diff --git a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java index b725cef1da4..2ce310eedde 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java +++ b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java @@ -83,7 +83,7 @@ protected boolean doReceive(Message reply) { } void register() { - register(repliedAddress, false, null); + register(false, false, null); } @Override diff --git a/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java b/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java index 43c3bd7401a..f57705b26f6 100644 --- a/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java +++ b/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java @@ -429,7 +429,7 @@ public void testHandlerMetricReply() throws Exception { assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address); assertWaitUntil(() -> metrics.getRegistrations().size() == 2); HandlerMetric registration = metrics.getRegistrations().get(1); - assertEquals(ADDRESS1, registration.repliedAddress); + assertEquals(null, registration.repliedAddress); // new behavior assertEquals(0, registration.scheduleCount.get()); assertEquals(0, registration.deliveredCount.get()); assertEquals(0, registration.localDeliveredCount.get()); @@ -443,13 +443,13 @@ public void testHandlerMetricReply() throws Exception { vertx.eventBus().request(ADDRESS1, "ping").onComplete(onSuccess(reply -> { assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address); HandlerMetric registration = replyRegistration.get(); - assertEquals(ADDRESS1, registration.repliedAddress); + assertEquals(null, registration.repliedAddress); assertEquals(1, registration.scheduleCount.get()); assertEquals(1, registration.deliveredCount.get()); assertEquals(1, registration.localDeliveredCount.get()); vertx.runOnContext(v -> { assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address); - assertEquals(ADDRESS1, registration.repliedAddress); + assertEquals(null, registration.repliedAddress); assertEquals(1, registration.scheduleCount.get()); assertEquals(1, registration.deliveredCount.get()); assertEquals(1, registration.localDeliveredCount.get());