Skip to content

Commit

Permalink
Fixes #4403 - Fix GrizzlyServer to not suspend indefinitely for async…
Browse files Browse the repository at this point in the history
… requests
  • Loading branch information
mnriem committed Dec 15, 2024
1 parent cb65ecf commit 037bebc
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void service(Request request, Response response) throws Exception {
GrizzlyHttpServerResponse gResponse = new GrizzlyHttpServerResponse(response);
HttpServerProcessorEndState state = httpServerProcessor.process(gRequest, gResponse);
if (state == ASYNCED) {
response.suspend();
response.suspend(60, SECONDS);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
*/
package cloud.piranha.test.coreprofile.distribution;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.WebApplicationException;
Expand All @@ -46,6 +48,12 @@
@Path("/sse")
public class SseBean {

/**
* Stores the broadcaster.
*/
@Inject
private SseBroadcastBean broadcastBean;

/**
* Stores the SSE context.
*/
Expand Down Expand Up @@ -75,4 +83,26 @@ public void string(@Context SseEventSink eventSink) {
}
}).start();
}

/**
* Perform a SSE Broadcast.
*
* @param message the message to broadcast.
*/
@Path("broadcast")
@POST
public void broadcast(String message) {
broadcastBean.broadcast("Message");
}

/**
* Register to receive messages.
*
* @param eventSink the event sink.
*/
@Path("register")
@GET
public void register(@Context SseEventSink eventSink) {
broadcastBean.register(eventSink);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,18 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
Expand All @@ -59,4 +69,53 @@ void testSseString() throws Exception {
assertNotNull(response.body());
assertTrue(response.body().contains("data: Event 4"));
}

/**
* Test SSE broadcast.
*
* @throw Exception when a serious error occurs.
*/
@Disabled
@Test
void testSseBroadcast() throws Exception {

List<String> messages = new ArrayList<>();
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(120))
.build();

/*
* Register client and collect events.
*/
CompletableFuture<Void> future = client.sendAsync(HttpRequest.newBuilder()
.uri(new URI(baseUrl + "/sse/register"))
.build(), BodyHandlers.ofLines())
.thenAccept(response -> response.body().forEach(message -> {
messages.add(message);
System.out.println("Received message: " + message);
}));

/*
* Simulate server broadcast.
*/
client.send(HttpRequest.newBuilder()
.uri(new URI(baseUrl + "/sse/broadcast"))
.POST(HttpRequest.BodyPublishers.ofString("Broadcast message"))
.build(), HttpResponse.BodyHandlers.ofString());

/*
* Wait for the future to complete.
*/
try {
future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
fail("Test timed out");
}

/*
* Check if we have received 10 events.
*/
assertEquals(10, messages.size(), "Should have received 10 events");
messages.forEach(System.out::println);
}
}

0 comments on commit 037bebc

Please sign in to comment.