diff --git a/rx-netty/src/main/java/io/reactivex/netty/channel/ObservableConnection.java b/rx-netty/src/main/java/io/reactivex/netty/channel/ObservableConnection.java index 089bcf5f..ca894ff7 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/channel/ObservableConnection.java +++ b/rx-netty/src/main/java/io/reactivex/netty/channel/ObservableConnection.java @@ -27,6 +27,8 @@ import rx.Observable; import rx.Subscriber; import rx.subjects.PublishSubject; +import rx.subjects.SerializedSubject; +import rx.subjects.Subject; /** * An observable connection for connection oriented protocols. @@ -36,7 +38,7 @@ */ public class ObservableConnection extends DefaultChannelWriter { - private PublishSubject inputSubject; + private Subject inputSubject; @SuppressWarnings("rawtypes")private final MetricEventsSubject eventsSubject; private final ChannelMetricEventProvider metricEventProvider; /* Guarded by closeIssued so that its only updated once*/ protected volatile long closeStartTimeMillis = -1; @@ -46,7 +48,7 @@ protected ObservableConnection(final Channel channel, ChannelMetricEventProvider super(channel, eventsSubject, metricEventProvider); this.eventsSubject = eventsSubject; this.metricEventProvider = metricEventProvider; - inputSubject = PublishSubject.create(); + inputSubject = new SerializedSubject(PublishSubject.create()); } public Observable getInput() { @@ -94,7 +96,7 @@ public Observable close() { @Override protected Observable _close(boolean flush) { - final PublishSubject thisSubject = inputSubject; + final Subject thisSubject = inputSubject; ReadTimeoutPipelineConfigurator.disableReadTimeout(getChannel().pipeline()); if (flush) { Observable toReturn = flush().lift(new Observable.Operator() { @@ -179,8 +181,8 @@ public void operationComplete(ChannelFuture future) throws Exception { }); } - protected void updateInputSubject(PublishSubject newSubject) { - inputSubject = newSubject; + protected void updateInputSubject(Subject newSubject) { + inputSubject = new SerializedSubject(newSubject); } } diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java index d24eee52..ed9f835b 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java @@ -27,6 +27,7 @@ import rx.functions.Action0; import rx.functions.Action1; import rx.internal.operators.NotificationLite; +import rx.observers.SerializedObserver; import rx.observers.Subscribers; import rx.schedulers.Schedulers; import rx.subjects.Subject; @@ -223,7 +224,7 @@ public boolean casState(STATES expected, STATES next) { } public void setObserverRef(Observer o) { // Guarded by casState() - observerRef = o; + observerRef = new SerializedObserver(o); } public boolean casObserverRef(Observer expected, Observer next) { diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/DefaultRedirectHandler.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/DefaultRedirectHandler.java index 9469784c..5c12c55e 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/DefaultRedirectHandler.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/DefaultRedirectHandler.java @@ -24,6 +24,7 @@ import rx.Observable; import java.net.URI; +import java.net.URISyntaxException; import java.util.Arrays; import static io.reactivex.netty.protocol.http.client.HttpRedirectException.Reason.InvalidRedirect; @@ -77,7 +78,13 @@ protected Observable> redirect(HttpClientRequest redire @Override public boolean requiresRedirect(RedirectionContext context, HttpClientResponse response) { int statusCode = response.getStatus().code(); - return Arrays.binarySearch(REDIRECTABLE_STATUS_CODES, statusCode) >= 0; + // This class only supports relative redirects as an HttpClient is always tied to a host:port combo and hence + // can not do an absolute redirect. + if (Arrays.binarySearch(REDIRECTABLE_STATUS_CODES, statusCode) >= 0) { + String location = extractRedirectLocation(response); + return !location.startsWith("http"); // Only process relative URIs: Issue https://github.com/ReactiveX/RxNetty/issues/270 + } + return false; } @Override @@ -107,8 +114,6 @@ public void validate(RedirectionContext context, HttpClientResponse redirectR + location, e); } - validateUri(location, redirectUri); - context.setNextRedirect(redirectUri); } @@ -116,24 +121,10 @@ protected String extractRedirectLocation(HttpClientResponse redirectedRespons return redirectedResponse.getHeaders().get(HttpHeaders.Names.LOCATION); } - protected void validateUri(String location, URI redirectUri) { - if (!redirectUri.isAbsolute()) { - // Redirect URI must be absolute - throw new HttpRedirectException(InvalidRedirect, - String.format("Redirect URI %s must be absolute", location)); - } - - String host = redirectUri.getHost(); - if (host == null) { - throw new HttpRedirectException(InvalidRedirect, - String.format("Location header %s does not contain a host name", location)); - } - } - protected HttpClientRequest createRedirectRequest(HttpClientRequest original, URI redirectLocation, int redirectStatus) { HttpRequest nettyRequest = original.getNettyRequest(); - nettyRequest.setUri(getNettyRequestUri(redirectLocation)); + nettyRequest.setUri(getNettyRequestUri(redirectLocation, original.getUri(), redirectStatus)); HttpClientRequest newRequest = new HttpClientRequest(nettyRequest, original); @@ -146,7 +137,7 @@ protected HttpClientRequest createRedirectRequest(HttpClientRequest origin return newRequest; } - private static String getNettyRequestUri(URI uri) { + protected static String getNettyRequestUri(URI uri, String originalUriString, int redirectStatus) { StringBuilder sb = new StringBuilder(); if (uri.getRawPath() != null) { sb.append(uri.getRawPath()); @@ -156,6 +147,20 @@ private static String getNettyRequestUri(URI uri) { } if (uri.getRawFragment() != null) { sb.append('#').append(uri.getRawFragment()); + } else if(redirectStatus >= 300) { + // http://tools.ietf.org/html/rfc7231#section-7.1.2 suggests that the URI fragment should be carried over to + // the redirect location if not exists in the redirect location. + // Issue: https://github.com/ReactiveX/RxNetty/issues/271 + try { + URI originalUri = new URI(originalUriString); + if (originalUri.getRawFragment() != null) { + sb.append('#').append(originalUri.getRawFragment()); + } + } catch (URISyntaxException e) { + logger.warn("Error parsing original request URI during redirect. " + + "This means that the path fragment if any in the original request will not be inherited " + + "by the redirect.", e); + } } return sb.toString(); } diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoder.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoder.java index ba60d288..cd3fefe5 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoder.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoder.java @@ -38,6 +38,13 @@ public class ServerSentEventDecoder extends ByteToMessageDecoder { private static final char[] DATA_FIELD_NAME = "data".toCharArray(); private static final char[] ID_FIELD_NAME = "id".toCharArray(); + protected static final ByteBufProcessor SKIP_TILL_LINE_DELIMITER_PROCESSOR = new ByteBufProcessor() { + @Override + public boolean process(byte value) throws Exception { + return !isLineDelimiter((char) value); + } + }; + protected static final ByteBufProcessor SKIP_LINE_DELIMITERS_AND_SPACES_PROCESSOR = new ByteBufProcessor() { @Override public boolean process(byte value) throws Exception { @@ -78,11 +85,10 @@ public boolean process(byte value) throws Exception { } } - private final int maxFieldNameLength; - private enum State { SkipColonAndWhiteSpaces,// Skip colon and all whitespaces after reading field name. SkipLineDelimitersAndSpaces,// Skip all line delimiters after field value end. + DiscardTillEOL,// On recieving an illegal/unidentified field, ignore everything till EOL. ReadFieldName, // Read till a colon to get the name of the field. ReadFieldValue // Read value till the line delimiter. } @@ -104,14 +110,6 @@ private enum State { private State state = State.ReadFieldName; - public ServerSentEventDecoder() { - this(DEFAULT_MAX_FIELD_LENGTH); - } - - public ServerSentEventDecoder(int maxFieldNameLength) { - this.maxFieldNameLength = maxFieldNameLength; - } - @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { @@ -125,34 +123,30 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t switch (state) { case SkipColonAndWhiteSpaces: - skipColonAndWhiteSapaces(in); - state = State.ReadFieldValue; + if (skipColonAndWhiteSpaces(in)) { + state = State.ReadFieldValue; + } break; case SkipLineDelimitersAndSpaces: - skipLineDelimiters(in); - state = State.ReadFieldName; + if (skipLineDelimiters(in)) { + state = State.ReadFieldName; + } + break; + case DiscardTillEOL: + if(skipTillEOL(in)) { + state = State.SkipLineDelimitersAndSpaces; + } break; case ReadFieldName: final int indexOfColon = scanAndFindColon(in); if (-1 == indexOfColon) { // No colon found - int bytesReceivedTillNow = null != incompleteData ? incompleteData.readableBytes() : - in.readableBytes() - readerIndexAtStart; - if (bytesReceivedTillNow > maxFieldNameLength) { // Reject as max field name length reached. - if (null != incompleteData) { - incompleteData.release(); - incompleteData = null; - } - throw new TooLongFieldNameException( - "Too long field name for a server sent event. Field name length received till now: " - + bytesReceivedTillNow); - } else { // Accumulate data into the field name buffer. - if (null == incompleteData) { - incompleteData = ctx.alloc().buffer(maxFieldNameLength, maxFieldNameLength); - } - // accumulate into incomplete data buffer to be used when the full data arrives. - incompleteData.writeBytes(in); + // Accumulate data into the field name buffer. + if (null == incompleteData) { + incompleteData = ctx.alloc().buffer(); } + // accumulate into incomplete data buffer to be used when the full data arrives. + incompleteData.writeBytes(in); } else { int fieldNameLengthInTheCurrentBuffer = indexOfColon - readerIndexAtStart; @@ -165,7 +159,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } else { // Consume the data from the input buffer. fieldNameBuffer = ctx.alloc().buffer(fieldNameLengthInTheCurrentBuffer, - fieldNameLengthInTheCurrentBuffer); + fieldNameLengthInTheCurrentBuffer); in.readBytes(fieldNameBuffer, fieldNameLengthInTheCurrentBuffer); } @@ -173,6 +167,9 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t try { currentFieldType = readCurrentFieldTypeFromBuffer(fieldNameBuffer); } finally { + if (null == currentFieldType) { + state = State.DiscardTillEOL; // Ignore this event completely. + } fieldNameBuffer.release(); } } @@ -276,8 +273,7 @@ private static ServerSentEvent.Type readCurrentFieldTypeFromBuffer(final ByteBuf toReturn = ServerSentEvent.Type.Id; break; default: - throw new IllegalArgumentException("Illegal Server Sent event field name: " - + fieldNameBuffer.toString(sseEncodingCharset)); + return null; } } else { if (++actualFieldNameIndexToCheck >= fieldNameToVerify.length || charAtI != fieldNameToVerify[actualFieldNameIndexToCheck]) { @@ -294,8 +290,7 @@ private static ServerSentEvent.Type readCurrentFieldTypeFromBuffer(final ByteBuf if (verified) { return toReturn; } else { - throw new IllegalArgumentException("Illegal Server Sent event field name: " - + fieldNameBuffer.toString(sseEncodingCharset)); + return null; } } @@ -321,21 +316,27 @@ protected static int scanAndFindEndOfLine(ByteBuf byteBuf) { return byteBuf.forEachByte(SCAN_EOL_PROCESSOR); } - protected static void skipLineDelimiters(ByteBuf byteBuf) { - skipTillMatching(byteBuf, SKIP_LINE_DELIMITERS_AND_SPACES_PROCESSOR); + protected static boolean skipLineDelimiters(ByteBuf byteBuf) { + return skipTillMatching(byteBuf, SKIP_LINE_DELIMITERS_AND_SPACES_PROCESSOR); } - protected static void skipColonAndWhiteSapaces(ByteBuf byteBuf) { - skipTillMatching(byteBuf, SKIP_COLON_AND_WHITE_SPACE_PROCESSOR); + protected static boolean skipColonAndWhiteSpaces(ByteBuf byteBuf) { + return skipTillMatching(byteBuf, SKIP_COLON_AND_WHITE_SPACE_PROCESSOR); } - protected static void skipTillMatching(ByteBuf byteBuf, ByteBufProcessor processor) { - int lastIndexProcessed = byteBuf.forEachByte(processor); + private static boolean skipTillEOL(ByteBuf in) { + return skipTillMatching(in, SKIP_TILL_LINE_DELIMITER_PROCESSOR); + } + + protected static boolean skipTillMatching(ByteBuf byteBuf, ByteBufProcessor processor) { + final int lastIndexProcessed = byteBuf.forEachByte(processor); if (-1 == lastIndexProcessed) { byteBuf.readerIndex(byteBuf.readerIndex() + byteBuf.readableBytes()); // If all the remaining bytes are to be ignored, discard the buffer. } else { byteBuf.readerIndex(lastIndexProcessed); } + + return -1 != lastIndexProcessed; } protected static boolean isLineDelimiter(char c) { diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/TooLongFieldNameException.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/TooLongFieldNameException.java deleted file mode 100644 index 4e19a018..00000000 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/TooLongFieldNameException.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivex.netty.protocol.http.sse; - -import io.netty.handler.codec.DecoderException; - -/** - * Exception when the field name of a Server Sent Event is more than - * - * @author Nitesh Kant - */ -public class TooLongFieldNameException extends DecoderException { - - private static final long serialVersionUID = 5592673637644375829L; - - public TooLongFieldNameException() { - } - - public TooLongFieldNameException(String message, Throwable cause) { - super(message, cause); - } - - public TooLongFieldNameException(String message) { - super(message); - } - - public TooLongFieldNameException(Throwable cause) { - super(cause); - } -} diff --git a/rx-netty/src/main/java/io/reactivex/netty/server/DefaultErrorHandler.java b/rx-netty/src/main/java/io/reactivex/netty/server/DefaultErrorHandler.java index 39abbced..6c115cac 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/server/DefaultErrorHandler.java +++ b/rx-netty/src/main/java/io/reactivex/netty/server/DefaultErrorHandler.java @@ -15,6 +15,8 @@ */ package io.reactivex.netty.server; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Observable; /** @@ -22,10 +24,11 @@ */ class DefaultErrorHandler implements ErrorHandler { + private static final Logger logger = LoggerFactory.getLogger(DefaultErrorHandler.class); + @Override public Observable handleError(Throwable throwable) { - System.err.println("Unexpected error in RxNetty. Error: " + throwable.getMessage()); - throwable.printStackTrace(System.err); + logger.error("Unexpected error in RxNetty.", throwable); return Observable.error(throwable); // If we do not return an error observable then the error is swallowed. } } diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/HttpRedirectTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/HttpRedirectTest.java index 2a180ca4..b6d699b9 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/HttpRedirectTest.java +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/HttpRedirectTest.java @@ -26,6 +26,7 @@ import org.junit.BeforeClass; import org.junit.Test; import rx.Observable; +import rx.functions.Action0; import rx.functions.Func1; import java.nio.charset.Charset; @@ -33,6 +34,8 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * @author Nitesh Kant @@ -65,7 +68,7 @@ public void testTooManyRedirect() throws Throwable { HttpClient client = new HttpClientBuilder("localhost", port) .config(config).enableWireLogging(LogLevel.ERROR) .build(); - String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirectLimited?redirectsRequested=6&port=" + port)); + String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirectLimited?redirectsRequested=6")); assertEquals("Hello world", content); } @@ -77,10 +80,41 @@ public void testRedirectLoop() throws Throwable { HttpClient client = new HttpClientBuilder("localhost", port) .config(config).enableWireLogging(LogLevel.ERROR) .build(); - String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirectLoop?redirectsRequested=6&port=" + port)); + String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirectLoop?redirectsRequested=6")); assertEquals("Hello world", content); } + @Test + public void testAbsoluteRedirect() throws Throwable { + HttpClient.HttpClientConfig.Builder builder = new HttpClient.HttpClientConfig.Builder(null); + HttpClient.HttpClientConfig config = builder.readTimeout(20000, TimeUnit.MILLISECONDS) + .build(); + HttpClient client = new HttpClientBuilder("localhost", port) + .config(config).enableWireLogging(LogLevel.ERROR) + .build(); + HttpClientRequest request = HttpClientRequest.createGet("test/redirectAbsolute"); + HttpClientResponse response = client.submit(request) + .map(new Func1, HttpClientResponse>() { + @Override + public HttpClientResponse call( + HttpClientResponse response) { + response.ignoreContent(); + return response; + } + }) + .doOnCompleted(new Action0() { + @Override + public void call() { + System.out.println("HttpRedirectTest.call"); + } + }) + .toBlocking().toFuture().get(1, TimeUnit.MINUTES); + assertEquals("Unexpected response code.", HttpResponseStatus.MOVED_PERMANENTLY, response.getStatus()); + String locationHeader = response.getHeaders().get("Location"); + assertNotNull("Location header not found.", locationHeader); + assertTrue("Location was not absolute.", locationHeader.startsWith("http")); + } + @Test public void testRedirectNoConnPool() throws Throwable { HttpClient.HttpClientConfig.Builder builder = new HttpClient.HttpClientConfig.Builder(null); @@ -89,7 +123,7 @@ public void testRedirectNoConnPool() throws Throwable { HttpClient client = new HttpClientBuilder("localhost", port) .config(config).enableWireLogging(LogLevel.ERROR) .build(); - String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirect?port=" + port)); + String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirect")); assertEquals("Hello world", content); } @@ -100,14 +134,14 @@ public void testRedirectWithConnPool() throws Throwable { .build(); HttpClient client = new HttpClientBuilder("localhost", port) .config(config).enableWireLogging(LogLevel.ERROR).withMaxConnections(10).build(); - String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirect?port=" + port)); + String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirect")); assertEquals("Hello world", content); } @Test public void testNoRedirect() { HttpClient.HttpClientConfig.Builder builder = new HttpClient.HttpClientConfig.Builder(null).setFollowRedirect(false); - HttpClientRequest request = HttpClientRequest.createGet("test/redirect?port=" + port); + HttpClientRequest request = HttpClientRequest.createGet("test/redirect"); HttpClient.HttpClientConfig config = builder.readTimeout(20000, TimeUnit.MILLISECONDS) .build(); HttpClient client = new HttpClientBuilder("localhost", port) @@ -122,7 +156,7 @@ public void testNoRedirect() { public void testRedirectPost() throws Throwable { HttpClient.HttpClientConfig.Builder builder = new HttpClient.HttpClientConfig.Builder(null); HttpClient.HttpClientConfig config = builder.setFollowRedirect(true).build(); - HttpClientRequest request = HttpClientRequest.createPost("test/redirectPost?port=" + port) + HttpClientRequest request = HttpClientRequest.createPost("test/redirectPost") .withContent("Hello world"); HttpClient client = new HttpClientBuilder("localhost", port) .config(config) @@ -135,7 +169,7 @@ public void testRedirectPost() throws Throwable { public void testNoRedirectPost() { HttpClient.HttpClientConfig.Builder builder = new HttpClient.HttpClientConfig.Builder(null); HttpClient.HttpClientConfig config = builder.build(); - HttpClientRequest request = HttpClientRequest.createPost("test/redirectPost?port=" + port) + HttpClientRequest request = HttpClientRequest.createPost("test/redirectPost") .withContent("Hello world"); HttpClient client = new HttpClientBuilder("localhost", port) .config(config) diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/RequestProcessor.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/RequestProcessor.java index daf2a67d..d4cce877 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/RequestProcessor.java +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/RequestProcessor.java @@ -126,26 +126,24 @@ public Observable handleKeepAliveTimeout(final HttpServerResponse return response.writeBytesAndFlush(responseBytes); } - public Observable redirectGet(HttpServerRequest request, final HttpServerResponse response) { - response.getHeaders().set("Location", "http://localhost:" + request.getQueryParameters().get("port").get(0) + "/test/singleEntity"); + public Observable redirectGet(final HttpServerResponse response) { + response.getHeaders().set("Location", "/test/singleEntity"); response.setStatus(HttpResponseStatus.MOVED_PERMANENTLY); return response.writeAndFlush(Unpooled.EMPTY_BUFFER); } public Observable redirectCustom(HttpServerRequest request, final HttpServerResponse response) { - String port = request.getQueryParameters().get("port").get(0); boolean isRedirectLoop = request.getUri().contains("redirectLoop"); int currentCount = getIntParamWithDefault(request, "count", 0); int redirectsRequested = getIntParamWithDefault(request, "redirectsRequested", 1); String location; if (currentCount >= redirectsRequested) { - location = "http://localhost:" + port + "/test/singleEntity"; + location = "/test/singleEntity"; } else { - - location = "http://localhost:" + port - + "/test/" + (isRedirectLoop ? "redirectLoop" : "redirectLimited" + redirectLoopUniqueIndex.incrementAndGet()) - + "?port=" + port + "&count=" + (currentCount + 1) - + "&redirectsRequested=" + redirectsRequested; + location = "/test/" + (isRedirectLoop ? "redirectLoop" + : "redirectLimited" + redirectLoopUniqueIndex.incrementAndGet()) + + "?count=" + (currentCount + 1) + + "&redirectsRequested=" + redirectsRequested; } response.getHeaders().set("Location", location); @@ -153,8 +151,14 @@ public Observable redirectCustom(HttpServerRequest request, final return response.writeAndFlush(Unpooled.EMPTY_BUFFER); } - public Observable redirectPost(HttpServerRequest request, final HttpServerResponse response) { - response.getHeaders().set("Location", "http://localhost:" + request.getQueryParameters().get("port").get(0) + "/test/post"); + public Observable redirectPost(final HttpServerResponse response) { + response.getHeaders().set("Location", "/test/post"); + response.setStatus(HttpResponseStatus.MOVED_PERMANENTLY); + return response.writeAndFlush(Unpooled.EMPTY_BUFFER); + } + + public Observable redirectAbsolute(final HttpServerResponse response) { + response.getHeaders().set("Location", "http://localhost/test/post"); response.setStatus(HttpResponseStatus.MOVED_PERMANENTLY); return response.writeAndFlush(Unpooled.EMPTY_BUFFER); } @@ -202,6 +206,8 @@ public Observable handle(HttpServerRequest request, HttpServerRes return handleCloseConnection(response); } else if (uri.startsWith("test/keepAliveTimeout")) { return handleKeepAliveTimeout(response); + } else if (uri.startsWith("test/redirectAbsolute")) { + return redirectAbsolute(response); } else if (uri.startsWith("test/redirectInfinite")) { return redirectCustom(request, response); } else if (uri.startsWith("test/redirectLoop")) { @@ -209,9 +215,9 @@ public Observable handle(HttpServerRequest request, HttpServerRes } else if (uri.startsWith("test/redirectLimited")) { return redirectCustom(request, response); } else if (uri.startsWith("test/redirect") && request.getHttpMethod().equals(HttpMethod.GET)) { - return redirectGet(request, response); + return redirectGet(response); } else if (uri.startsWith("test/redirectPost") && request.getHttpMethod().equals(HttpMethod.POST)) { - return redirectPost(request, response); + return redirectPost(response); } else { response.setStatus(HttpResponseStatus.NOT_FOUND); return response.flush(); diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoderTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoderTest.java index b21e63ed..b8d0f90b 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoderTest.java +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoderTest.java @@ -18,7 +18,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.DecoderException; import io.reactivex.netty.NoOpChannelHandlerContext; import org.junit.Test; @@ -30,6 +29,8 @@ import static io.reactivex.netty.protocol.http.sse.SseTestUtil.newSseProtocolString; import static io.reactivex.netty.protocol.http.sse.SseTestUtil.toByteBuf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * @author Tomasz Bak @@ -42,13 +43,6 @@ public class ServerSentEventDecoderTest { static class TestableServerSentEventDecoder extends ServerSentEventDecoder { - TestableServerSentEventDecoder() { - } - - TestableServerSentEventDecoder(int maxFieldNameLength) { - super(maxFieldNameLength); - } - @Override public void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) { super.callDecode(ctx, in, out); @@ -190,28 +184,30 @@ public void testIncompleteFieldName() throws Exception { doTest("ata: data line\n", expected); } - @Test(expected = TooLongFieldNameException.class) - public void testMaxFieldLength() throws Exception { - TestableServerSentEventDecoder decoder = new TestableServerSentEventDecoder(2); - decoder.callDecode(ch, toByteBuf("eventt"), new ArrayList()); + @Test + public void testInvalidFieldNameAndNextEvent() throws Exception { + ArrayList out = new ArrayList(); + decoder.callDecode(ch, toByteBuf("eventt: event type\n"), out); + assertTrue("Output list not empty.", out.isEmpty()); + + decoder.callDecode(ch, toByteBuf("data: dumb \n"), out); + assertFalse("Event not emitted after invalid field name.", out.isEmpty()); + assertEquals("Unexpected event count after invalid field name.", 1, out.size()); + } - @Test(expected = IllegalArgumentException.class) + @Test public void testInvalidFieldName() throws Throwable { - try { - decoder.callDecode(ch, toByteBuf("eventt: dumb \n"), new ArrayList()); - } catch (DecoderException e) { - throw e.getCause(); - } + ArrayList out = new ArrayList(); + decoder.callDecode(ch, toByteBuf("eventt: dumb \n"), out); + assertTrue("Event emitted for invalid field name.", out.isEmpty()); } - @Test(expected = IllegalArgumentException.class) + @Test public void testFieldNameWithSpace() throws Throwable { - try { - decoder.callDecode(ch, toByteBuf("eve nt: dumb \n"), new ArrayList()); - } catch (DecoderException e) { - throw e.getCause(); - } + ArrayList out = new ArrayList(); + decoder.callDecode(ch, toByteBuf("eve nt: dumb \n"), new ArrayList()); + assertTrue("Event emitted for invalid field name.", out.isEmpty()); } @Test