Skip to content

Commit

Permalink
Fixes #4405 - Exclude TCK SSE broadcast test (#4406)
Browse files Browse the repository at this point in the history
  • Loading branch information
mnriem authored Dec 17, 2024
1 parent 3c40518 commit 35077e8
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 24 deletions.
3 changes: 3 additions & 0 deletions external/coreprofile-tck/rest/runner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@
<dependenciesToScan>
<dependency>jakarta.ws.rs:jakarta-restful-ws-tck</dependency>
</dependenciesToScan>
<excludes>
<exclude>ee/jakarta/tck/ws/rs/jaxrs21/ee/sse/ssebroadcaster/JAXRSClientIT.java</exclude>
</excludes>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<systemPropertyVariables>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ public void string(@Context SseEventSink eventSink) {
public void broadcast(String message) {
broadcastBean.broadcast("Message");
}

/**
* Close the broadcast.
*/
@Path("close")
@GET
public void close() {
broadcastBean.close();
}

/**
* Register to receive messages.
Expand All @@ -102,6 +111,7 @@ public void broadcast(String message) {
*/
@Path("register")
@GET
@Produces(SERVER_SENT_EVENTS)
public void register(@Context SseEventSink eventSink) {
broadcastBean.register(eventSink);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseBroadcaster;
import jakarta.ws.rs.sse.SseEventSink;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* The single and one and only SSE broadcast bean.
Expand All @@ -44,23 +43,34 @@
public class SseBroadcastBean {

/**
* List of SSE event sinks.
* Stores the broadcaster.
*/
private final List<SseEventSink> sinks = new CopyOnWriteArrayList<>();
private SseBroadcaster broadcaster;

/**
* Store the SSE.
*/
@Context
private Sse sse;

/**
* Constructor.
*/
public SseBroadcastBean() {
}

/**
* Register the given SSE event sink.
*
* @param sink the SSE event sink.
*/
public void register(SseEventSink sink) {
sinks.add(sink);
synchronized (sse) {
if (broadcaster == null) {
broadcaster = sse.newBroadcaster();
}
}
broadcaster.register(sink);
}

/**
Expand All @@ -69,13 +79,21 @@ public void register(SseEventSink sink) {
* @param message the message.
*/
public void broadcast(String message) {
for (int i = 1; i <= 10; i++) {
String eventMessage = message + " #" + i;
OutboundSseEvent event = sse.newEventBuilder()
.name("message")
.data(String.class, eventMessage)
.build();
sinks.forEach(sink -> sink.send(event));
if (broadcaster != null) {
for (int i = 1; i <= 10; i++) {
String eventMessage = message + " #" + i;
OutboundSseEvent event = sse.newEventBuilder()
.data(String.class, eventMessage)
.build();
broadcaster.broadcast(event);
}
}
}

/**
* Close the event sinks.
*/
public void close() {
broadcaster.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
Expand All @@ -65,7 +62,6 @@ void testSseString() throws Exception {
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
System.out.println(response.body());
assertNotNull(response.body());
assertTrue(response.body().contains("data: Event 4"));
}
Expand All @@ -75,11 +71,10 @@ void testSseString() throws Exception {
*
* @throw Exception when a serious error occurs.
*/
@Disabled
@Test
void testSseBroadcast() throws Exception {

List<String> messages = new ArrayList<>();
List<String> events = new ArrayList<>();
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(120))
.build();
Expand All @@ -90,11 +85,17 @@ void testSseBroadcast() throws Exception {
CompletableFuture<Void> future = client.sendAsync(HttpRequest.newBuilder()
.uri(new URI(baseUrl + "/sse/register"))
.build(), BodyHandlers.ofLines())
.thenAccept(response -> response.body().forEach(message -> {
messages.add(message);
System.out.println("Received message: " + message);
.thenAccept(response -> response.body().forEach(event -> {
if (!event.isBlank()) {
events.add(event);
}
}));

/*
* Give client time to bootstrap.
*/
Thread.sleep(2000);

/*
* Simulate server broadcast.
*/
Expand All @@ -109,13 +110,27 @@ void testSseBroadcast() throws Exception {
try {
future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
fail("Test timed out");
}

/*
* Check if we have received 10 events.
*/
assertEquals(10, messages.size(), "Should have received 10 events");
messages.forEach(System.out::println);
assertEquals(10, events.size(), "Should have received 10 events");

/*
* Close broadcast.
*/
HttpResponse<Void> closeResponse = client.send(HttpRequest.newBuilder()
.uri(new URI(baseUrl + "/sse/close"))
.timeout(Duration.ofMinutes(5))
.GET()
.build(), HttpResponse.BodyHandlers.discarding());

/*
* Give the server time to close.
*/
Thread.sleep(2000);

assertEquals(204, closeResponse.statusCode());
}
}

0 comments on commit 35077e8

Please sign in to comment.