diff --git a/external/coreprofile-tck/rest/runner/pom.xml b/external/coreprofile-tck/rest/runner/pom.xml index 134b4a853..2ed321b2a 100644 --- a/external/coreprofile-tck/rest/runner/pom.xml +++ b/external/coreprofile-tck/rest/runner/pom.xml @@ -104,6 +104,9 @@ jakarta.ws.rs:jakarta-restful-ws-tck + + ee/jakarta/tck/ws/rs/jaxrs21/ee/sse/ssebroadcaster/JAXRSClientIT.java + 1 false diff --git a/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBean.java b/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBean.java index a48a18193..9379fa9ba 100644 --- a/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBean.java +++ b/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBean.java @@ -94,6 +94,15 @@ public void string(@Context SseEventSink eventSink) { public void broadcast(String message) { broadcastBean.broadcast("Message"); } + + /** + * Close the broadcast. + */ + @Path("close") + @GET + public void close() { + broadcastBean.close(); + } /** * Register to receive messages. @@ -102,6 +111,7 @@ public void broadcast(String message) { */ @Path("register") @GET + @Produces(SERVER_SENT_EVENTS) public void register(@Context SseEventSink eventSink) { broadcastBean.register(eventSink); } diff --git a/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBroadcastBean.java b/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBroadcastBean.java index 7f82e2805..368e45531 100644 --- a/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBroadcastBean.java +++ b/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBroadcastBean.java @@ -31,9 +31,8 @@ import jakarta.ws.rs.core.Context; import jakarta.ws.rs.sse.OutboundSseEvent; import jakarta.ws.rs.sse.Sse; +import jakarta.ws.rs.sse.SseBroadcaster; import jakarta.ws.rs.sse.SseEventSink; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; /** * The single and one and only SSE broadcast bean. @@ -44,9 +43,9 @@ public class SseBroadcastBean { /** - * List of SSE event sinks. + * Stores the broadcaster. */ - private final List sinks = new CopyOnWriteArrayList<>(); + private SseBroadcaster broadcaster; /** * Store the SSE. @@ -54,13 +53,24 @@ public class SseBroadcastBean { @Context private Sse sse; + /** + * Constructor. + */ + public SseBroadcastBean() { + } + /** * Register the given SSE event sink. * * @param sink the SSE event sink. */ public void register(SseEventSink sink) { - sinks.add(sink); + synchronized (sse) { + if (broadcaster == null) { + broadcaster = sse.newBroadcaster(); + } + } + broadcaster.register(sink); } /** @@ -69,13 +79,21 @@ public void register(SseEventSink sink) { * @param message the message. */ public void broadcast(String message) { - for (int i = 1; i <= 10; i++) { - String eventMessage = message + " #" + i; - OutboundSseEvent event = sse.newEventBuilder() - .name("message") - .data(String.class, eventMessage) - .build(); - sinks.forEach(sink -> sink.send(event)); + if (broadcaster != null) { + for (int i = 1; i <= 10; i++) { + String eventMessage = message + " #" + i; + OutboundSseEvent event = sse.newEventBuilder() + .data(String.class, eventMessage) + .build(); + broadcaster.broadcast(event); + } } } + + /** + * Close the event sinks. + */ + public void close() { + broadcaster.close(); + } } diff --git a/test/coreprofile/integration/src/test/java/cloud/piranha/test/coreprofile/distribution/SseIT.java b/test/coreprofile/integration/src/test/java/cloud/piranha/test/coreprofile/distribution/SseIT.java index 2bc076090..a7f377663 100644 --- a/test/coreprofile/integration/src/test/java/cloud/piranha/test/coreprofile/distribution/SseIT.java +++ b/test/coreprofile/integration/src/test/java/cloud/piranha/test/coreprofile/distribution/SseIT.java @@ -40,9 +40,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** @@ -65,7 +62,6 @@ void testSseString() throws Exception { .build(); HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); - System.out.println(response.body()); assertNotNull(response.body()); assertTrue(response.body().contains("data: Event 4")); } @@ -75,11 +71,10 @@ void testSseString() throws Exception { * * @throw Exception when a serious error occurs. */ - @Disabled @Test void testSseBroadcast() throws Exception { - List messages = new ArrayList<>(); + List events = new ArrayList<>(); HttpClient client = HttpClient.newBuilder() .connectTimeout(Duration.ofSeconds(120)) .build(); @@ -90,11 +85,17 @@ void testSseBroadcast() throws Exception { CompletableFuture future = client.sendAsync(HttpRequest.newBuilder() .uri(new URI(baseUrl + "/sse/register")) .build(), BodyHandlers.ofLines()) - .thenAccept(response -> response.body().forEach(message -> { - messages.add(message); - System.out.println("Received message: " + message); + .thenAccept(response -> response.body().forEach(event -> { + if (!event.isBlank()) { + events.add(event); + } })); + /* + * Give client time to bootstrap. + */ + Thread.sleep(2000); + /* * Simulate server broadcast. */ @@ -109,13 +110,27 @@ void testSseBroadcast() throws Exception { try { future.get(10, TimeUnit.SECONDS); } catch (Exception e) { - fail("Test timed out"); } /* * Check if we have received 10 events. */ - assertEquals(10, messages.size(), "Should have received 10 events"); - messages.forEach(System.out::println); + assertEquals(10, events.size(), "Should have received 10 events"); + + /* + * Close broadcast. + */ + HttpResponse closeResponse = client.send(HttpRequest.newBuilder() + .uri(new URI(baseUrl + "/sse/close")) + .timeout(Duration.ofMinutes(5)) + .GET() + .build(), HttpResponse.BodyHandlers.discarding()); + + /* + * Give the server time to close. + */ + Thread.sleep(2000); + + assertEquals(204, closeResponse.statusCode()); } }