Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #4405 - Exclude TCK SSE broadcast test #4406

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions external/coreprofile-tck/rest/runner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@
<dependenciesToScan>
<dependency>jakarta.ws.rs:jakarta-restful-ws-tck</dependency>
</dependenciesToScan>
<excludes>
<exclude>ee/jakarta/tck/ws/rs/jaxrs21/ee/sse/ssebroadcaster/JAXRSClientIT.java</exclude>
</excludes>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<systemPropertyVariables>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ public void string(@Context SseEventSink eventSink) {
public void broadcast(String message) {
broadcastBean.broadcast("Message");
}

/**
* Close the broadcast.
*/
@Path("close")
@GET
public void close() {
broadcastBean.close();
}

/**
* Register to receive messages.
Expand All @@ -102,6 +111,7 @@ public void broadcast(String message) {
*/
@Path("register")
@GET
@Produces(SERVER_SENT_EVENTS)
public void register(@Context SseEventSink eventSink) {
broadcastBean.register(eventSink);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseBroadcaster;
import jakarta.ws.rs.sse.SseEventSink;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* The single and one and only SSE broadcast bean.
Expand All @@ -44,23 +43,34 @@
public class SseBroadcastBean {

/**
* List of SSE event sinks.
* Stores the broadcaster.
*/
private final List<SseEventSink> sinks = new CopyOnWriteArrayList<>();
private SseBroadcaster broadcaster;

/**
* Store the SSE.
*/
@Context
private Sse sse;

/**
* Constructor.
*/
public SseBroadcastBean() {
}

/**
* Register the given SSE event sink.
*
* @param sink the SSE event sink.
*/
public void register(SseEventSink sink) {
sinks.add(sink);
synchronized (sse) {
if (broadcaster == null) {
broadcaster = sse.newBroadcaster();
}
}
broadcaster.register(sink);
}

/**
Expand All @@ -69,13 +79,21 @@ public void register(SseEventSink sink) {
* @param message the message.
*/
public void broadcast(String message) {
for (int i = 1; i <= 10; i++) {
String eventMessage = message + " #" + i;
OutboundSseEvent event = sse.newEventBuilder()
.name("message")
.data(String.class, eventMessage)
.build();
sinks.forEach(sink -> sink.send(event));
if (broadcaster != null) {
for (int i = 1; i <= 10; i++) {
String eventMessage = message + " #" + i;
OutboundSseEvent event = sse.newEventBuilder()
.data(String.class, eventMessage)
.build();
broadcaster.broadcast(event);
}
}
}

/**
* Close the event sinks.
*/
public void close() {
broadcaster.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
Expand All @@ -65,7 +62,6 @@ void testSseString() throws Exception {
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
System.out.println(response.body());
assertNotNull(response.body());
assertTrue(response.body().contains("data: Event 4"));
}
Expand All @@ -75,11 +71,10 @@ void testSseString() throws Exception {
*
* @throw Exception when a serious error occurs.
*/
@Disabled
@Test
void testSseBroadcast() throws Exception {

List<String> messages = new ArrayList<>();
List<String> events = new ArrayList<>();
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(120))
.build();
Expand All @@ -90,11 +85,17 @@ void testSseBroadcast() throws Exception {
CompletableFuture<Void> future = client.sendAsync(HttpRequest.newBuilder()
.uri(new URI(baseUrl + "/sse/register"))
.build(), BodyHandlers.ofLines())
.thenAccept(response -> response.body().forEach(message -> {
messages.add(message);
System.out.println("Received message: " + message);
.thenAccept(response -> response.body().forEach(event -> {
if (!event.isBlank()) {
events.add(event);
}
}));

/*
* Give client time to bootstrap.
*/
Thread.sleep(2000);

/*
* Simulate server broadcast.
*/
Expand All @@ -109,13 +110,27 @@ void testSseBroadcast() throws Exception {
try {
future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
fail("Test timed out");
}

/*
* Check if we have received 10 events.
*/
assertEquals(10, messages.size(), "Should have received 10 events");
messages.forEach(System.out::println);
assertEquals(10, events.size(), "Should have received 10 events");

/*
* Close broadcast.
*/
HttpResponse<Void> closeResponse = client.send(HttpRequest.newBuilder()
.uri(new URI(baseUrl + "/sse/close"))
.timeout(Duration.ofMinutes(5))
.GET()
.build(), HttpResponse.BodyHandlers.discarding());

/*
* Give the server time to close.
*/
Thread.sleep(2000);

assertEquals(204, closeResponse.statusCode());
}
}
Loading