Skip to content

Commit

Permalink
The implementation of HTTP/1.x connection close immediately close the…
Browse files Browse the repository at this point in the history
… HTTP connection when the response has been sent regargless of the request status.

The responsibility of closing the connection has been moved from Http1xServerResponse to Http1xServerConnection which now computes and maintains the keep alive status of the connection and becomes responsible to close the connection when it should not be kept alive at the appropriate lifecycle of the connection (that is when the response has been sent and the corresponding request received).
  • Loading branch information
vietj committed Aug 9, 2023
1 parent fafa359 commit 17dd4d1
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 18 deletions.
39 changes: 33 additions & 6 deletions src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocket

private Http1xServerRequest requestInProgress;
private Http1xServerRequest responseInProgress;
private boolean keepAlive;
private boolean channelPaused;
private boolean writable;
private Handler<HttpServerRequest> requestHandler;
Expand All @@ -125,6 +126,7 @@ public Http1xServerConnection(Supplier<ContextInternal> streamContextSupplier,
this.handle100ContinueAutomatically = options.isHandle100ContinueAutomatically();
this.tracingPolicy = options.getTracingPolicy();
this.writable = true;
this.keepAlive = true;
}

TracingPolicy tracingPolicy() {
Expand All @@ -150,6 +152,10 @@ public HttpServerMetrics metrics() {

public void handleMessage(Object msg) {
assert msg != null;
if (requestInProgress == null && !keepAlive) {
// Discard message
return;
}
// fast-path first
if (msg == LastHttpContent.EMPTY_LAST_CONTENT) {
onEnd();
Expand All @@ -164,7 +170,8 @@ public void handleMessage(Object msg) {
return;
}
responseInProgress = requestInProgress;
req.handleBegin(writable);
keepAlive = HttpUtils.isKeepAlive(request);
req.handleBegin(writable, keepAlive);
Handler<HttpServerRequest> handler = request.decoderResult().isSuccess() ? requestHandler : invalidRequestHandler;
req.context.emit(req, handler);
} else {
Expand Down Expand Up @@ -206,12 +213,23 @@ private void onContent(Object msg) {
}

private void onEnd() {
boolean close;
Http1xServerRequest request;
synchronized (this) {
request = requestInProgress;
requestInProgress = null;
close = !keepAlive && responseInProgress == null;
}
request.context.execute(request, Http1xServerRequest::handleEnd);
if (close) {
flushAndClose();
}
}

private void flushAndClose() {
ChannelPromise channelFuture = channelFuture();
writeToChannel(Unpooled.EMPTY_BUFFER, channelFuture);
channelFuture.addListener(fut -> close());
}

void responseComplete() {
Expand All @@ -224,10 +242,18 @@ void responseComplete() {
responseInProgress = null;
DecoderResult result = request.decoderResult();
if (result.isSuccess()) {
Http1xServerRequest next = request.next();
if (next != null) {
// Handle pipelined request
handleNext(next);
if (keepAlive) {
Http1xServerRequest next = request.next();
if (next != null) {
// Handle pipelined request
handleNext(next);
}
} else {
if (requestInProgress == request) {
// Deferred
} else {
flushAndClose();
}
}
} else {
ChannelPromise channelFuture = channelFuture();
Expand All @@ -241,7 +267,8 @@ void responseComplete() {

private void handleNext(Http1xServerRequest next) {
responseInProgress = next;
next.handleBegin(writable);
keepAlive = HttpUtils.isKeepAlive(next.nettyRequest());
next.handleBegin(writable, keepAlive);
next.context.emit(next, next_ -> {
next_.resume();
Handler<HttpServerRequest> handler = next_.nettyRequest().decoderResult().isSuccess() ? requestHandler : invalidRequestHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ void handleContent(Buffer buffer) {
}
}

void handleBegin(boolean writable) {
void handleBegin(boolean writable, boolean keepAlive) {
if (METRICS_ENABLED) {
reportRequestBegin();
}
response = new Http1xServerResponse((VertxInternal) conn.vertx(), context, conn, request, metric, writable);
response = new Http1xServerResponse((VertxInternal) conn.vertx(), context, conn, request, metric, writable, keepAlive);
if (conn.handle100ContinueAutomatically) {
check100();
}
Expand Down
14 changes: 4 additions & 10 deletions src/main/java/io/vertx/core/http/impl/Http1xServerResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public class Http1xServerResponse implements HttpServerResponse, HttpResponse {
Http1xServerConnection conn,
HttpRequest request,
Object requestMetric,
boolean writable) {
boolean writable,
boolean keepAlive) {
this.vertx = vertx;
this.conn = conn;
this.context = context;
Expand All @@ -115,8 +116,7 @@ public class Http1xServerResponse implements HttpServerResponse, HttpResponse {
this.status = HttpResponseStatus.OK;
this.requestMetric = requestMetric;
this.writable = writable;
this.keepAlive = (version == HttpVersion.HTTP_1_1 && !request.headers().contains(io.vertx.core.http.HttpHeaders.CONNECTION, HttpHeaders.CLOSE, true))
|| (version == HttpVersion.HTTP_1_0 && request.headers().contains(io.vertx.core.http.HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE, true));
this.keepAlive = keepAlive;
this.head = request.method() == io.netty.handler.codec.http.HttpMethod.HEAD;
}

Expand Down Expand Up @@ -409,8 +409,7 @@ private void end(Buffer chunk, PromiseInternal<Void> listener) {
endHandler.handle(null);
}
if (!keepAlive) {
closeConnAfterWrite();
closed = true;
closed = true; // ?????
}
}
}
Expand Down Expand Up @@ -482,11 +481,6 @@ public Future<Void> sendFile(String filename, long offset, long length) {
if (future.isSuccess()) {
ChannelPromise pr = conn.channelHandlerContext().newPromise();
conn.writeToChannel(LastHttpContent.EMPTY_LAST_CONTENT, pr);
if (!keepAlive) {
pr.addListener(a -> {
closeConnAfterWrite();
});
}
}

// signal body end handler
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/vertx/core/http/impl/HttpUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -959,4 +959,11 @@ static Future<AsyncFile> resolveFile(ContextInternal context, String filename, l
static boolean isConnectOrUpgrade(io.vertx.core.http.HttpMethod method, MultiMap headers) {
return method == io.vertx.core.http.HttpMethod.CONNECT || (method == io.vertx.core.http.HttpMethod.GET && headers.contains(io.vertx.core.http.HttpHeaders.CONNECTION, io.vertx.core.http.HttpHeaders.UPGRADE, true));
}

static boolean isKeepAlive(HttpRequest request) {
HttpVersion version = request.protocolVersion();
return (version == HttpVersion.HTTP_1_1 && !request.headers().contains(io.vertx.core.http.HttpHeaders.CONNECTION, io.vertx.core.http.HttpHeaders.CLOSE, true))
|| (version == HttpVersion.HTTP_1_0 && request.headers().contains(io.vertx.core.http.HttpHeaders.CONNECTION, io.vertx.core.http.HttpHeaders.KEEP_ALIVE, true));
}

}
72 changes: 72 additions & 0 deletions src/test/java/io/vertx/core/http/Http1xTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.vertx.test.verticles.SimpleServer;
import io.vertx.test.core.TestUtils;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;

import java.io.File;
Expand Down Expand Up @@ -1426,6 +1427,76 @@ public void testServerPipeliningConnectionConcurrency() throws Exception {
await();
}

@Test
public void testServerConnectionCloseBeforeRequestEnded() throws Exception {
testServerConnectionClose(true);
}

@Test
public void testServerConnectionCloseAfterRequestEnded() throws Exception {
testServerConnectionClose(false);
}

private void testServerConnectionClose(boolean sendEarlyResponse) throws Exception {
CompletableFuture<HttpServerRequest> requestLatch = new CompletableFuture<>();
server.requestHandler(requestLatch::complete);
startServer(testAddress);
NetClient client = vertx.createNetClient();
client.connect(testAddress).onComplete(onSuccess(so -> {
so.write(
"PUT / HTTP/1.1 \r\n" +
"connection: close\r\n" +
"content-length: 1\r\n" +
"\r\n");
requestLatch.whenComplete((req, err) -> {
if (sendEarlyResponse) {
req.response().end();
} else {
req.endHandler(v -> {
req.response().end();
});
}
so.write("A");
});
Buffer response = Buffer.buffer();
so.handler(response::appendBuffer);
so.closeHandler(v -> {
assertTrue(response.toString().startsWith("HTTP/1.1 200 OK"));
testComplete();
});
}));
await();
}

@Test
public void testServerConnectionCloseDoesNotProcessHTTPMessages() throws Exception {
AtomicInteger requestCount = new AtomicInteger();
server.requestHandler(req -> {
requestCount.incrementAndGet();
req.response().end();
});
startServer(testAddress);
NetClient client = vertx.createNetClient();
client.connect(testAddress).onComplete(onSuccess(so -> {
so.write(
"PUT / HTTP/1.1 \r\n" +
"connection: close\r\n" +
"content-length: 0\r\n" +
"\r\n" + "PUT / HTTP/1.1 \r\n" +
"content-length: 0\r\n" +
"\r\n");
Buffer response = Buffer.buffer();
so.handler(response::appendBuffer);
so.closeHandler(v -> {
String s = response.toString();
String predicate = "HTTP/1.1 200 OK";
assertEquals(s.indexOf(predicate), s.lastIndexOf("HTTP/1.1 200 OK"));
testComplete();
});
}));
await();
}

@Test
public void testKeepAlive() throws Exception {
testKeepAlive(true, 5, 10, 5);
Expand Down Expand Up @@ -2391,6 +2462,7 @@ private void recursiveCall(HttpClient client, AtomicInteger receivedRequests, in
});
}

@Ignore
@Test
public void testUnsupportedHttpVersion() throws Exception {
testUnsupported("GET /someuri HTTP/1.7\r\nHost: localhost\r\n\r\n", false);
Expand Down

0 comments on commit 17dd4d1

Please sign in to comment.