Skip to content

Commit

Permalink
Use a broadcast flag when registering a handler registration
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 31, 2023
1 parent 6799425 commit 11e9b67
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 13 deletions.
10 changes: 5 additions & 5 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,17 +258,17 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers,
return msg;
}

protected <T> Consumer<Promise<Void>> addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly, Promise<Void> promise) {
protected <T> Consumer<Promise<Void>> addRegistration(String address, HandlerRegistration<T> registration, boolean broadcast, boolean localOnly, Promise<Void> promise) {
HandlerHolder<T> holder = addLocalRegistration(address, registration, localOnly);
if (!replyHandler) {
if (broadcast) {
onLocalRegistration(holder, promise);
} else {
if (promise != null) {
promise.complete();
}
}
return p -> {
removeRegistration(holder, replyHandler, p);
removeRegistration(holder, broadcast, p);
};
}

Expand Down Expand Up @@ -303,9 +303,9 @@ protected <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> regist
return new HandlerHolder<>(registration, localOnly, context);
}

protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, boolean replyHandler, Promise<Void> promise) {
protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, boolean broadcast, Promise<Void> promise) {
removeLocalRegistration(handlerHolder);
if (!replyHandler) {
if (broadcast) {
onLocalUnregistration(handlerHolder, promise);
} else {
promise.complete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ void receive(MessageImpl msg) {

protected abstract void dispatch(Message<T> msg, ContextInternal context, Handler<Message<T>> handler);

synchronized void register(String repliedAddress, boolean localOnly, Promise<Void> promise) {
synchronized void register(boolean broadcast, boolean localOnly, Promise<Void> promise) {
if (registered != null) {
throw new IllegalStateException();
}
registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, promise);
registered = bus.addRegistration(address, this, broadcast, localOnly, promise);
if (bus.metrics != null) {
metric = bus.metrics.handlerRegistered(address, repliedAddress);
metric = bus.metrics.handlerRegistered(address, null /* regression */);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
registered = true;
Promise<Void> p = result;
Promise<Void> registration = context.promise();
register(null, localOnly, registration);
register(true, localOnly, registration);
registration.future().onComplete(ar -> {
if (ar.succeeded()) {
p.tryComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected boolean doReceive(Message<T> reply) {
}

void register() {
register(repliedAddress, false, null);
register(false, false, null);
}

@Override
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/io/vertx/core/spi/metrics/MetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ public void testHandlerMetricReply() throws Exception {
assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address);
assertWaitUntil(() -> metrics.getRegistrations().size() == 2);
HandlerMetric registration = metrics.getRegistrations().get(1);
assertEquals(ADDRESS1, registration.repliedAddress);
assertEquals(null, registration.repliedAddress); // new behavior
assertEquals(0, registration.scheduleCount.get());
assertEquals(0, registration.deliveredCount.get());
assertEquals(0, registration.localDeliveredCount.get());
Expand All @@ -443,13 +443,13 @@ public void testHandlerMetricReply() throws Exception {
vertx.eventBus().request(ADDRESS1, "ping").onComplete(onSuccess(reply -> {
assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address);
HandlerMetric registration = replyRegistration.get();
assertEquals(ADDRESS1, registration.repliedAddress);
assertEquals(null, registration.repliedAddress);
assertEquals(1, registration.scheduleCount.get());
assertEquals(1, registration.deliveredCount.get());
assertEquals(1, registration.localDeliveredCount.get());
vertx.runOnContext(v -> {
assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address);
assertEquals(ADDRESS1, registration.repliedAddress);
assertEquals(null, registration.repliedAddress);
assertEquals(1, registration.scheduleCount.get());
assertEquals(1, registration.deliveredCount.get());
assertEquals(1, registration.localDeliveredCount.get());
Expand Down

0 comments on commit 11e9b67

Please sign in to comment.