Skip to content

Commit

Permalink
feat: add support for mutiny based reactive endpoints
Browse files Browse the repository at this point in the history
Allows Hilla endpoints to specify Mutiny Multi as return type for reactive endpoints.
Also introduces MutinyEndpointSubscription as a Mutiny alternative to Hilla EndpointSubscription
to provide unsubscribe callback.

Fixes #1028
  • Loading branch information
mcollovati committed Nov 23, 2024
1 parent 09e4fc9 commit 3b11e50
Show file tree
Hide file tree
Showing 13 changed files with 905 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2023 Marco Collovati, Dario Götze
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.mcollovati.quarkus.hilla.deployment;

import io.quarkus.test.QuarkusUnitTest;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.github.mcollovati.quarkus.hilla.deployment.endpoints.MutinyReactiveEndpoint;

class MutinyReactiveEndpointTest extends AbstractReactiveEndpointTest {
private static final String ENDPOINT_NAME = MutinyReactiveEndpoint.class.getSimpleName();

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withConfigurationResource(testResource("test-application.properties"))
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(MutinyReactiveEndpoint.class, HillaPushClient.class));

@Override
public String getEndpointName() {
return ENDPOINT_NAME;
}

private static String testResource(String name) {
return MutinyReactiveEndpointTest.class.getPackageName().replace('.', '/') + '/' + name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2023 Marco Collovati, Dario Götze
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.mcollovati.quarkus.hilla.deployment.endpoints;

import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import com.vaadin.flow.server.auth.AnonymousAllowed;
import com.vaadin.hilla.BrowserCallable;
import io.smallrye.mutiny.Multi;

import com.github.mcollovati.quarkus.hilla.MutinyEndpointSubscription;

@BrowserCallable
@AnonymousAllowed
public class MutinyReactiveEndpoint {

private final ConcurrentHashMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();

public Multi<Integer> count(String counterName) {
Duration interval = Duration.ofMillis(200);
return Multi.createFrom()
.ticks()
.startingAfter(interval)
.every(interval)
.map(_interval -> counters.computeIfAbsent(counterName, unused -> new AtomicInteger())
.incrementAndGet());
}

public MutinyEndpointSubscription<Integer> cancelableCount(String counterName) {
return MutinyEndpointSubscription.of(count(counterName), () -> {
counters.get(counterName).set(-1);
});
}

public Integer counterValue(String counterName) {
if (counters.containsKey(counterName)) {
return counters.get(counterName).get();
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2024 Marco Collovati, Dario Götze
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.mcollovati.quarkus.hilla;

import io.smallrye.mutiny.Multi;

/**
* A subscription that wraps a Multi and allows to listen for unsubscribe events
* from the browser.
* <p>
* An unsubscribe event is sent when "cancel" is called in the browser but also
* if the browser has disconnected from the server either explicitly or been
* disconnected from the server for a long enough time.
* <p>
* Attribution:
* This file is based on work from Vaadin Ltd.
* Copyright 2000-2024 Vaadin Ltd. https://vaadin.com
* Original source: https://github.com/vaadin/hilla/blob/main/packages/java/endpoint/src/main/java/com/vaadin/hilla/EndpointSubscription.java
* Changes made:
* - Replaced reactor Flux type with Mutiny Multi.
*/
public class MutinyEndpointSubscription<TT> {

private Multi<TT> multi;
private Runnable onUnsubscribe;

private MutinyEndpointSubscription(Multi<TT> multi, Runnable onUnsubscribe) {
this.multi = multi;
this.onUnsubscribe = onUnsubscribe;
}

/**
* Returns the multi value provide for this subscription.
*/
public Multi<TT> getMulti() {
return multi;
}

/**
* Returns the callback that is invoked when the browser unsubscribes from
* the subscription.
*/
public Runnable getOnUnsubscribe() {
return onUnsubscribe;
}

/**
* Creates a new endpoint subscription.
*
* A subscription wraps a multi that provides the values for the subscriber
* (browser) and a callback that is invoked when the browser unsubscribes
* from the subscription.
*
* @param <T>
* the type of data in the subscription
* @param flux
* the multi that produces the data
* @param onDisconnect
* a callback that is invoked when the browser unsubscribes
* @return a subscription
*/
public static <T> MutinyEndpointSubscription<T> of(Multi<T> flux, Runnable onDisconnect) {
return new MutinyEndpointSubscription<>(flux, onDisconnect);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.quarkus.arc.Unremovable;
import io.quarkus.runtime.StartupEvent;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.springframework.context.ApplicationContext;

@Unremovable
Expand Down Expand Up @@ -147,9 +148,15 @@ EndpointInvoker endpointInvoker(
JacksonObjectMapperFactory objectMapperFactory,
ExplicitNullableTypeChecker explicitNullableTypeChecker,
ServletContext servletContext,
EndpointRegistry endpointRegistry) {
return new EndpointInvoker(
applicationContext, objectMapperFactory, explicitNullableTypeChecker, servletContext, endpointRegistry);
EndpointRegistry endpointRegistry,
ManagedExecutor executor) {
return new QuarkusEndpointInvoker(
applicationContext,
objectMapperFactory,
explicitNullableTypeChecker,
servletContext,
endpointRegistry,
executor);
}

@Produces
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright 2024 Marco Collovati, Dario Götze
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.mcollovati.quarkus.hilla;

import jakarta.servlet.ServletContext;
import java.security.Principal;
import java.util.function.Function;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.vaadin.hilla.EndpointInvocationException;
import com.vaadin.hilla.EndpointInvoker;
import com.vaadin.hilla.EndpointRegistry;
import com.vaadin.hilla.EndpointSubscription;
import com.vaadin.hilla.ExplicitNullableTypeChecker;
import com.vaadin.hilla.parser.jackson.JacksonObjectMapperFactory;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.Cancellable;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.springframework.context.ApplicationContext;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
* Extension of EndpointInvoker that handles transformations for Quarkus types.
* <p></p>
* Implemented transformations:
* - Multi -> Flux
*/
public class QuarkusEndpointInvoker extends EndpointInvoker {

private final Scheduler scheduler;

/**
* Creates an instance of this bean.
*
* @param applicationContext The Spring application context
* @param endpointMapperFactory optional factory bean to override the default
* {@link JacksonObjectMapperFactory} that is used for
* serializing and deserializing request and response bodies Use
* {@link com.vaadin.hilla.EndpointController#ENDPOINT_MAPPER_FACTORY_BEAN_QUALIFIER}
* qualifier to override the mapper.
* @param explicitNullableTypeChecker the method parameter and return value type checker to verify
* that null values are explicit
* @param servletContext the servlet context
* @param endpointRegistry the registry used to store endpoint information
*/
public QuarkusEndpointInvoker(
ApplicationContext applicationContext,
JacksonObjectMapperFactory endpointMapperFactory,
ExplicitNullableTypeChecker explicitNullableTypeChecker,
ServletContext servletContext,
EndpointRegistry endpointRegistry,
ManagedExecutor executor) {
super(applicationContext, endpointMapperFactory, explicitNullableTypeChecker, servletContext, endpointRegistry);
scheduler = Schedulers.fromExecutor(executor);
}

@Override
public Class<?> getReturnType(String endpointName, String methodName) {
Class<?> returnType = super.getReturnType(endpointName, methodName);

if (returnType != null
&& (Multi.class.isAssignableFrom(returnType)
|| MutinyEndpointSubscription.class.isAssignableFrom(returnType))) {
return EndpointSubscription.class;
}
return returnType;
}

@Override
public Object invoke(
String endpointName,
String methodName,
ObjectNode body,
Principal principal,
Function<String, Boolean> rolesChecker)
throws EndpointInvocationException.EndpointNotFoundException,
EndpointInvocationException.EndpointAccessDeniedException,
EndpointInvocationException.EndpointBadRequestException,
EndpointInvocationException.EndpointInternalException {
Object object = super.invoke(endpointName, methodName, body, principal, rolesChecker);
if (object instanceof Multi<?> multi) {
object = multiToEndpointSubscription(multi, null);
} else if (object instanceof MutinyEndpointSubscription<?> endpointSubscription) {
object = multiToEndpointSubscription(
endpointSubscription.getMulti(), endpointSubscription.getOnUnsubscribe());
}
return object;
}

@SuppressWarnings({"MutinyCallingSubscribeInNonBlockingScope", "ReactiveStreamsPublisherImplementation"})
private EndpointSubscription<?> multiToEndpointSubscription(Multi<?> multi, Runnable onUnsubscribe) {
OnDisconnect onDisconnect = new OnDisconnect(onUnsubscribe);
Flux<?> flux = Flux.from(subscribe -> {
Cancellable cancelable =
multi.subscribe().with(subscribe::onNext, subscribe::onError, subscribe::onComplete);
onDisconnect.setCancellable(cancelable);
})
.cancelOn(scheduler)
.subscribeOn(scheduler)
.publishOn(scheduler);
return EndpointSubscription.of(flux, onDisconnect);
}

private static class OnDisconnect implements Runnable {
private final Runnable onUnsubscribe;
private Cancellable cancellable;

OnDisconnect(Runnable onUnsubscribe) {
this.onUnsubscribe = onUnsubscribe;
}

void setCancellable(Cancellable cancellable) {
this.cancellable = cancellable;
}

@Override
public void run() {
if (cancellable != null) {
cancellable.cancel();
}
if (onUnsubscribe != null) {
onUnsubscribe.run();
}
}
}
}
Loading

0 comments on commit 3b11e50

Please sign in to comment.