Skip to content

Commit

Permalink
Fixes for 0.3.18
Browse files Browse the repository at this point in the history
#269(`ServerSentEventDecoder` stops decoding after receiving an illegal field): Now ignoring all data till an EOL after receiving an unidentified field, in essence ignoring that event.
#270: HttpClient should only do relative redirects
#271: URI fragment must be inherited by the redirect location
#272(Guaranteeing sequential notifications from operators/Subjects): Using `SerializedSubject` in `ObservableConnection.getInput()` and `SerializedObserver` in `UnicastContentSubject`
#257 Support relative HTTP redirects
  • Loading branch information
Nitesh Kant committed Nov 5, 2014
1 parent 3ebf249 commit 53d285d
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import rx.Observable;
import rx.Subscriber;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/**
* An observable connection for connection oriented protocols.
Expand All @@ -36,7 +38,7 @@
*/
public class ObservableConnection<I, O> extends DefaultChannelWriter<O> {

private PublishSubject<I> inputSubject;
private Subject<I, I> inputSubject;
@SuppressWarnings("rawtypes")private final MetricEventsSubject eventsSubject;
private final ChannelMetricEventProvider metricEventProvider;
/* Guarded by closeIssued so that its only updated once*/ protected volatile long closeStartTimeMillis = -1;
Expand All @@ -46,7 +48,7 @@ protected ObservableConnection(final Channel channel, ChannelMetricEventProvider
super(channel, eventsSubject, metricEventProvider);
this.eventsSubject = eventsSubject;
this.metricEventProvider = metricEventProvider;
inputSubject = PublishSubject.create();
inputSubject = new SerializedSubject<I, I>(PublishSubject.<I>create());
}

public Observable<I> getInput() {
Expand Down Expand Up @@ -94,7 +96,7 @@ public Observable<Void> close() {

@Override
protected Observable<Void> _close(boolean flush) {
final PublishSubject<I> thisSubject = inputSubject;
final Subject<I, I> thisSubject = inputSubject;
ReadTimeoutPipelineConfigurator.disableReadTimeout(getChannel().pipeline());
if (flush) {
Observable<Void> toReturn = flush().lift(new Observable.Operator<Void, Void>() {
Expand Down Expand Up @@ -179,8 +181,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}

protected void updateInputSubject(PublishSubject<I> newSubject) {
inputSubject = newSubject;
protected void updateInputSubject(Subject<I, I> newSubject) {
inputSubject = new SerializedSubject<I, I>(newSubject);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import rx.functions.Action0;
import rx.functions.Action1;
import rx.internal.operators.NotificationLite;
import rx.observers.SerializedObserver;
import rx.observers.Subscribers;
import rx.schedulers.Schedulers;
import rx.subjects.Subject;
Expand Down Expand Up @@ -223,7 +224,7 @@ public boolean casState(STATES expected, STATES next) {
}

public void setObserverRef(Observer<? super T> o) { // Guarded by casState()
observerRef = o;
observerRef = new SerializedObserver<T>(o);
}

public boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import rx.Observable;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;

import static io.reactivex.netty.protocol.http.client.HttpRedirectException.Reason.InvalidRedirect;
Expand Down Expand Up @@ -77,7 +78,13 @@ protected Observable<HttpClientResponse<O>> redirect(HttpClientRequest<I> redire
@Override
public boolean requiresRedirect(RedirectionContext context, HttpClientResponse<O> response) {
int statusCode = response.getStatus().code();
return Arrays.binarySearch(REDIRECTABLE_STATUS_CODES, statusCode) >= 0;
// This class only supports relative redirects as an HttpClient is always tied to a host:port combo and hence
// can not do an absolute redirect.
if (Arrays.binarySearch(REDIRECTABLE_STATUS_CODES, statusCode) >= 0) {
String location = extractRedirectLocation(response);
return !location.startsWith("http"); // Only process relative URIs: Issue https://github.com/ReactiveX/RxNetty/issues/270
}
return false;
}

@Override
Expand Down Expand Up @@ -107,33 +114,17 @@ public void validate(RedirectionContext context, HttpClientResponse<O> redirectR
+ location, e);
}

validateUri(location, redirectUri);

context.setNextRedirect(redirectUri);
}

protected String extractRedirectLocation(HttpClientResponse<O> redirectedResponse) {
return redirectedResponse.getHeaders().get(HttpHeaders.Names.LOCATION);
}

protected void validateUri(String location, URI redirectUri) {
if (!redirectUri.isAbsolute()) {
// Redirect URI must be absolute
throw new HttpRedirectException(InvalidRedirect,
String.format("Redirect URI %s must be absolute", location));
}

String host = redirectUri.getHost();
if (host == null) {
throw new HttpRedirectException(InvalidRedirect,
String.format("Location header %s does not contain a host name", location));
}
}

protected HttpClientRequest<I> createRedirectRequest(HttpClientRequest<I> original, URI redirectLocation,
int redirectStatus) {
HttpRequest nettyRequest = original.getNettyRequest();
nettyRequest.setUri(getNettyRequestUri(redirectLocation));
nettyRequest.setUri(getNettyRequestUri(redirectLocation, original.getUri(), redirectStatus));

HttpClientRequest<I> newRequest = new HttpClientRequest<I>(nettyRequest, original);

Expand All @@ -146,7 +137,7 @@ protected HttpClientRequest<I> createRedirectRequest(HttpClientRequest<I> origin
return newRequest;
}

private static String getNettyRequestUri(URI uri) {
protected static String getNettyRequestUri(URI uri, String originalUriString, int redirectStatus) {
StringBuilder sb = new StringBuilder();
if (uri.getRawPath() != null) {
sb.append(uri.getRawPath());
Expand All @@ -156,6 +147,20 @@ private static String getNettyRequestUri(URI uri) {
}
if (uri.getRawFragment() != null) {
sb.append('#').append(uri.getRawFragment());
} else if(redirectStatus >= 300) {
// http://tools.ietf.org/html/rfc7231#section-7.1.2 suggests that the URI fragment should be carried over to
// the redirect location if not exists in the redirect location.
// Issue: https://github.com/ReactiveX/RxNetty/issues/271
try {
URI originalUri = new URI(originalUriString);
if (originalUri.getRawFragment() != null) {
sb.append('#').append(originalUri.getRawFragment());
}
} catch (URISyntaxException e) {
logger.warn("Error parsing original request URI during redirect. " +
"This means that the path fragment if any in the original request will not be inherited " +
"by the redirect.", e);
}
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
*/
package io.reactivex.netty.server;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/**
* @author Nitesh Kant
*/
class DefaultErrorHandler implements ErrorHandler {

private static final Logger logger = LoggerFactory.getLogger(DefaultErrorHandler.class);

@Override
public Observable<Void> handleError(Throwable throwable) {
System.err.println("Unexpected error in RxNetty. Error: " + throwable.getMessage());
throwable.printStackTrace(System.err);
logger.error("Unexpected error in RxNetty.", throwable);
return Observable.error(throwable); // If we do not return an error observable then the error is swallowed.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
import org.junit.BeforeClass;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func1;

import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

/**
* @author Nitesh Kant
Expand Down Expand Up @@ -65,7 +68,7 @@ public void testTooManyRedirect() throws Throwable {
HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, ByteBuf>("localhost", port)
.config(config).enableWireLogging(LogLevel.ERROR)
.build();
String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirectLimited?redirectsRequested=6&port=" + port));
String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirectLimited?redirectsRequested=6"));
assertEquals("Hello world", content);
}

Expand All @@ -77,10 +80,41 @@ public void testRedirectLoop() throws Throwable {
HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, ByteBuf>("localhost", port)
.config(config).enableWireLogging(LogLevel.ERROR)
.build();
String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirectLoop?redirectsRequested=6&port=" + port));
String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirectLoop?redirectsRequested=6"));
assertEquals("Hello world", content);
}

@Test
public void testAbsoluteRedirect() throws Throwable {
HttpClient.HttpClientConfig.Builder builder = new HttpClient.HttpClientConfig.Builder(null);
HttpClient.HttpClientConfig config = builder.readTimeout(20000, TimeUnit.MILLISECONDS)
.build();
HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, ByteBuf>("localhost", port)
.config(config).enableWireLogging(LogLevel.ERROR)
.build();
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("test/redirectAbsolute");
HttpClientResponse<ByteBuf> response = client.submit(request)
.map(new Func1<HttpClientResponse<ByteBuf>, HttpClientResponse<ByteBuf>>() {
@Override
public HttpClientResponse<ByteBuf> call(
HttpClientResponse<ByteBuf> response) {
response.ignoreContent();
return response;
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
System.out.println("HttpRedirectTest.call");
}
})
.toBlocking().toFuture().get(1, TimeUnit.MINUTES);
assertEquals("Unexpected response code.", HttpResponseStatus.MOVED_PERMANENTLY, response.getStatus());
String locationHeader = response.getHeaders().get("Location");
assertNotNull("Location header not found.", locationHeader);
assertTrue("Location was not absolute.", locationHeader.startsWith("http"));
}

@Test
public void testRedirectNoConnPool() throws Throwable {
HttpClient.HttpClientConfig.Builder builder = new HttpClient.HttpClientConfig.Builder(null);
Expand All @@ -89,7 +123,7 @@ public void testRedirectNoConnPool() throws Throwable {
HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, ByteBuf>("localhost", port)
.config(config).enableWireLogging(LogLevel.ERROR)
.build();
String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirect?port=" + port));
String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirect"));
assertEquals("Hello world", content);
}

Expand All @@ -100,14 +134,14 @@ public void testRedirectWithConnPool() throws Throwable {
.build();
HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, ByteBuf>("localhost", port)
.config(config).enableWireLogging(LogLevel.ERROR).withMaxConnections(10).build();
String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirect?port=" + port));
String content = invokeBlockingCall(client, HttpClientRequest.createGet("test/redirect"));
assertEquals("Hello world", content);
}

@Test
public void testNoRedirect() {
HttpClient.HttpClientConfig.Builder builder = new HttpClient.HttpClientConfig.Builder(null).setFollowRedirect(false);
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("test/redirect?port=" + port);
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("test/redirect");
HttpClient.HttpClientConfig config = builder.readTimeout(20000, TimeUnit.MILLISECONDS)
.build();
HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, ByteBuf>("localhost", port)
Expand All @@ -122,7 +156,7 @@ public void testNoRedirect() {
public void testRedirectPost() throws Throwable {
HttpClient.HttpClientConfig.Builder builder = new HttpClient.HttpClientConfig.Builder(null);
HttpClient.HttpClientConfig config = builder.setFollowRedirect(true).build();
HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost("test/redirectPost?port=" + port)
HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost("test/redirectPost")
.withContent("Hello world");
HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, ByteBuf>("localhost", port)
.config(config)
Expand All @@ -135,7 +169,7 @@ public void testRedirectPost() throws Throwable {
public void testNoRedirectPost() {
HttpClient.HttpClientConfig.Builder builder = new HttpClient.HttpClientConfig.Builder(null);
HttpClient.HttpClientConfig config = builder.build();
HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost("test/redirectPost?port=" + port)
HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost("test/redirectPost")
.withContent("Hello world");
HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, ByteBuf>("localhost", port)
.config(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,35 +126,39 @@ public Observable<Void> handleKeepAliveTimeout(final HttpServerResponse<ByteBuf>
return response.writeBytesAndFlush(responseBytes);
}

public Observable<Void> redirectGet(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
response.getHeaders().set("Location", "http://localhost:" + request.getQueryParameters().get("port").get(0) + "/test/singleEntity");
public Observable<Void> redirectGet(final HttpServerResponse<ByteBuf> response) {
response.getHeaders().set("Location", "/test/singleEntity");
response.setStatus(HttpResponseStatus.MOVED_PERMANENTLY);
return response.writeAndFlush(Unpooled.EMPTY_BUFFER);
}

public Observable<Void> redirectCustom(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
String port = request.getQueryParameters().get("port").get(0);
boolean isRedirectLoop = request.getUri().contains("redirectLoop");
int currentCount = getIntParamWithDefault(request, "count", 0);
int redirectsRequested = getIntParamWithDefault(request, "redirectsRequested", 1);
String location;
if (currentCount >= redirectsRequested) {
location = "http://localhost:" + port + "/test/singleEntity";
location = "/test/singleEntity";
} else {

location = "http://localhost:" + port
+ "/test/" + (isRedirectLoop ? "redirectLoop" : "redirectLimited" + redirectLoopUniqueIndex.incrementAndGet())
+ "?port=" + port + "&count=" + (currentCount + 1)
+ "&redirectsRequested=" + redirectsRequested;
location = "/test/" + (isRedirectLoop ? "redirectLoop"
: "redirectLimited" + redirectLoopUniqueIndex.incrementAndGet())
+ "?count=" + (currentCount + 1)
+ "&redirectsRequested=" + redirectsRequested;
}

response.getHeaders().set("Location", location);
response.setStatus(HttpResponseStatus.MOVED_PERMANENTLY);
return response.writeAndFlush(Unpooled.EMPTY_BUFFER);
}

public Observable<Void> redirectPost(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
response.getHeaders().set("Location", "http://localhost:" + request.getQueryParameters().get("port").get(0) + "/test/post");
public Observable<Void> redirectPost(final HttpServerResponse<ByteBuf> response) {
response.getHeaders().set("Location", "/test/post");
response.setStatus(HttpResponseStatus.MOVED_PERMANENTLY);
return response.writeAndFlush(Unpooled.EMPTY_BUFFER);
}

public Observable<Void> redirectAbsolute(final HttpServerResponse<ByteBuf> response) {
response.getHeaders().set("Location", "http://localhost/test/post");
response.setStatus(HttpResponseStatus.MOVED_PERMANENTLY);
return response.writeAndFlush(Unpooled.EMPTY_BUFFER);
}
Expand Down Expand Up @@ -202,16 +206,18 @@ public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerRes
return handleCloseConnection(response);
} else if (uri.startsWith("test/keepAliveTimeout")) {
return handleKeepAliveTimeout(response);
} else if (uri.startsWith("test/redirectAbsolute")) {
return redirectAbsolute(response);
} else if (uri.startsWith("test/redirectInfinite")) {
return redirectCustom(request, response);
} else if (uri.startsWith("test/redirectLoop")) {
return redirectCustom(request, response);
} else if (uri.startsWith("test/redirectLimited")) {
return redirectCustom(request, response);
} else if (uri.startsWith("test/redirect") && request.getHttpMethod().equals(HttpMethod.GET)) {
return redirectGet(request, response);
return redirectGet(response);
} else if (uri.startsWith("test/redirectPost") && request.getHttpMethod().equals(HttpMethod.POST)) {
return redirectPost(request, response);
return redirectPost(response);
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.flush();
Expand Down

0 comments on commit 53d285d

Please sign in to comment.