Skip to content

Commit

Permalink
Merge pull request #273 from NiteshKant/0.x
Browse files Browse the repository at this point in the history
Fixes for 0.3.18
  • Loading branch information
NiteshKant committed Nov 5, 2014
2 parents 7796555 + 53d285d commit 7ecdd35
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import rx.Observable;
import rx.Subscriber;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/**
* An observable connection for connection oriented protocols.
Expand All @@ -36,7 +38,7 @@
*/
public class ObservableConnection<I, O> extends DefaultChannelWriter<O> {

private PublishSubject<I> inputSubject;
private Subject<I, I> inputSubject;
@SuppressWarnings("rawtypes")private final MetricEventsSubject eventsSubject;
private final ChannelMetricEventProvider metricEventProvider;
/* Guarded by closeIssued so that its only updated once*/ protected volatile long closeStartTimeMillis = -1;
Expand All @@ -46,7 +48,7 @@ protected ObservableConnection(final Channel channel, ChannelMetricEventProvider
super(channel, eventsSubject, metricEventProvider);
this.eventsSubject = eventsSubject;
this.metricEventProvider = metricEventProvider;
inputSubject = PublishSubject.create();
inputSubject = new SerializedSubject<I, I>(PublishSubject.<I>create());
}

public Observable<I> getInput() {
Expand Down Expand Up @@ -94,7 +96,7 @@ public Observable<Void> close() {

@Override
protected Observable<Void> _close(boolean flush) {
final PublishSubject<I> thisSubject = inputSubject;
final Subject<I, I> thisSubject = inputSubject;
ReadTimeoutPipelineConfigurator.disableReadTimeout(getChannel().pipeline());
if (flush) {
Observable<Void> toReturn = flush().lift(new Observable.Operator<Void, Void>() {
Expand Down Expand Up @@ -179,8 +181,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}

protected void updateInputSubject(PublishSubject<I> newSubject) {
inputSubject = newSubject;
protected void updateInputSubject(Subject<I, I> newSubject) {
inputSubject = new SerializedSubject<I, I>(newSubject);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import rx.functions.Action0;
import rx.functions.Action1;
import rx.internal.operators.NotificationLite;
import rx.observers.SerializedObserver;
import rx.observers.Subscribers;
import rx.schedulers.Schedulers;
import rx.subjects.Subject;
Expand Down Expand Up @@ -223,7 +224,7 @@ public boolean casState(STATES expected, STATES next) {
}

public void setObserverRef(Observer<? super T> o) { // Guarded by casState()
observerRef = o;
observerRef = new SerializedObserver<T>(o);
}

public boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import rx.Observable;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;

import static io.reactivex.netty.protocol.http.client.HttpRedirectException.Reason.InvalidRedirect;
Expand Down Expand Up @@ -77,7 +78,13 @@ protected Observable<HttpClientResponse<O>> redirect(HttpClientRequest<I> redire
@Override
public boolean requiresRedirect(RedirectionContext context, HttpClientResponse<O> response) {
int statusCode = response.getStatus().code();
return Arrays.binarySearch(REDIRECTABLE_STATUS_CODES, statusCode) >= 0;
// This class only supports relative redirects as an HttpClient is always tied to a host:port combo and hence
// can not do an absolute redirect.
if (Arrays.binarySearch(REDIRECTABLE_STATUS_CODES, statusCode) >= 0) {
String location = extractRedirectLocation(response);
return !location.startsWith("http"); // Only process relative URIs: Issue https://github.com/ReactiveX/RxNetty/issues/270
}
return false;
}

@Override
Expand Down Expand Up @@ -107,33 +114,17 @@ public void validate(RedirectionContext context, HttpClientResponse<O> redirectR
+ location, e);
}

validateUri(location, redirectUri);

context.setNextRedirect(redirectUri);
}

protected String extractRedirectLocation(HttpClientResponse<O> redirectedResponse) {
return redirectedResponse.getHeaders().get(HttpHeaders.Names.LOCATION);
}

protected void validateUri(String location, URI redirectUri) {
if (!redirectUri.isAbsolute()) {
// Redirect URI must be absolute
throw new HttpRedirectException(InvalidRedirect,
String.format("Redirect URI %s must be absolute", location));
}

String host = redirectUri.getHost();
if (host == null) {
throw new HttpRedirectException(InvalidRedirect,
String.format("Location header %s does not contain a host name", location));
}
}

protected HttpClientRequest<I> createRedirectRequest(HttpClientRequest<I> original, URI redirectLocation,
int redirectStatus) {
HttpRequest nettyRequest = original.getNettyRequest();
nettyRequest.setUri(getNettyRequestUri(redirectLocation));
nettyRequest.setUri(getNettyRequestUri(redirectLocation, original.getUri(), redirectStatus));

HttpClientRequest<I> newRequest = new HttpClientRequest<I>(nettyRequest, original);

Expand All @@ -146,7 +137,7 @@ protected HttpClientRequest<I> createRedirectRequest(HttpClientRequest<I> origin
return newRequest;
}

private static String getNettyRequestUri(URI uri) {
protected static String getNettyRequestUri(URI uri, String originalUriString, int redirectStatus) {
StringBuilder sb = new StringBuilder();
if (uri.getRawPath() != null) {
sb.append(uri.getRawPath());
Expand All @@ -156,6 +147,20 @@ private static String getNettyRequestUri(URI uri) {
}
if (uri.getRawFragment() != null) {
sb.append('#').append(uri.getRawFragment());
} else if(redirectStatus >= 300) {
// http://tools.ietf.org/html/rfc7231#section-7.1.2 suggests that the URI fragment should be carried over to
// the redirect location if not exists in the redirect location.
// Issue: https://github.com/ReactiveX/RxNetty/issues/271
try {
URI originalUri = new URI(originalUriString);
if (originalUri.getRawFragment() != null) {
sb.append('#').append(originalUri.getRawFragment());
}
} catch (URISyntaxException e) {
logger.warn("Error parsing original request URI during redirect. " +
"This means that the path fragment if any in the original request will not be inherited " +
"by the redirect.", e);
}
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public class ServerSentEventDecoder extends ByteToMessageDecoder {
private static final char[] DATA_FIELD_NAME = "data".toCharArray();
private static final char[] ID_FIELD_NAME = "id".toCharArray();

protected static final ByteBufProcessor SKIP_TILL_LINE_DELIMITER_PROCESSOR = new ByteBufProcessor() {
@Override
public boolean process(byte value) throws Exception {
return !isLineDelimiter((char) value);
}
};

protected static final ByteBufProcessor SKIP_LINE_DELIMITERS_AND_SPACES_PROCESSOR = new ByteBufProcessor() {
@Override
public boolean process(byte value) throws Exception {
Expand Down Expand Up @@ -78,11 +85,10 @@ public boolean process(byte value) throws Exception {
}
}

private final int maxFieldNameLength;

private enum State {
SkipColonAndWhiteSpaces,// Skip colon and all whitespaces after reading field name.
SkipLineDelimitersAndSpaces,// Skip all line delimiters after field value end.
DiscardTillEOL,// On recieving an illegal/unidentified field, ignore everything till EOL.
ReadFieldName, // Read till a colon to get the name of the field.
ReadFieldValue // Read value till the line delimiter.
}
Expand All @@ -104,14 +110,6 @@ private enum State {

private State state = State.ReadFieldName;

public ServerSentEventDecoder() {
this(DEFAULT_MAX_FIELD_LENGTH);
}

public ServerSentEventDecoder(int maxFieldNameLength) {
this.maxFieldNameLength = maxFieldNameLength;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

Expand All @@ -125,34 +123,30 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t

switch (state) {
case SkipColonAndWhiteSpaces:
skipColonAndWhiteSapaces(in);
state = State.ReadFieldValue;
if (skipColonAndWhiteSpaces(in)) {
state = State.ReadFieldValue;
}
break;
case SkipLineDelimitersAndSpaces:
skipLineDelimiters(in);
state = State.ReadFieldName;
if (skipLineDelimiters(in)) {
state = State.ReadFieldName;
}
break;
case DiscardTillEOL:
if(skipTillEOL(in)) {
state = State.SkipLineDelimitersAndSpaces;
}
break;
case ReadFieldName:
final int indexOfColon = scanAndFindColon(in);

if (-1 == indexOfColon) { // No colon found
int bytesReceivedTillNow = null != incompleteData ? incompleteData.readableBytes() :
in.readableBytes() - readerIndexAtStart;
if (bytesReceivedTillNow > maxFieldNameLength) { // Reject as max field name length reached.
if (null != incompleteData) {
incompleteData.release();
incompleteData = null;
}
throw new TooLongFieldNameException(
"Too long field name for a server sent event. Field name length received till now: "
+ bytesReceivedTillNow);
} else { // Accumulate data into the field name buffer.
if (null == incompleteData) {
incompleteData = ctx.alloc().buffer(maxFieldNameLength, maxFieldNameLength);
}
// accumulate into incomplete data buffer to be used when the full data arrives.
incompleteData.writeBytes(in);
// Accumulate data into the field name buffer.
if (null == incompleteData) {
incompleteData = ctx.alloc().buffer();
}
// accumulate into incomplete data buffer to be used when the full data arrives.
incompleteData.writeBytes(in);
} else {
int fieldNameLengthInTheCurrentBuffer = indexOfColon - readerIndexAtStart;

Expand All @@ -165,14 +159,17 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
} else {
// Consume the data from the input buffer.
fieldNameBuffer = ctx.alloc().buffer(fieldNameLengthInTheCurrentBuffer,
fieldNameLengthInTheCurrentBuffer);
fieldNameLengthInTheCurrentBuffer);
in.readBytes(fieldNameBuffer, fieldNameLengthInTheCurrentBuffer);
}

state = State.SkipColonAndWhiteSpaces; // We have read the field name, next we should skip colon & WS.
try {
currentFieldType = readCurrentFieldTypeFromBuffer(fieldNameBuffer);
} finally {
if (null == currentFieldType) {
state = State.DiscardTillEOL; // Ignore this event completely.
}
fieldNameBuffer.release();
}
}
Expand Down Expand Up @@ -276,8 +273,7 @@ private static ServerSentEvent.Type readCurrentFieldTypeFromBuffer(final ByteBuf
toReturn = ServerSentEvent.Type.Id;
break;
default:
throw new IllegalArgumentException("Illegal Server Sent event field name: "
+ fieldNameBuffer.toString(sseEncodingCharset));
return null;
}
} else {
if (++actualFieldNameIndexToCheck >= fieldNameToVerify.length || charAtI != fieldNameToVerify[actualFieldNameIndexToCheck]) {
Expand All @@ -294,8 +290,7 @@ private static ServerSentEvent.Type readCurrentFieldTypeFromBuffer(final ByteBuf
if (verified) {
return toReturn;
} else {
throw new IllegalArgumentException("Illegal Server Sent event field name: "
+ fieldNameBuffer.toString(sseEncodingCharset));
return null;
}
}

Expand All @@ -321,21 +316,27 @@ protected static int scanAndFindEndOfLine(ByteBuf byteBuf) {
return byteBuf.forEachByte(SCAN_EOL_PROCESSOR);
}

protected static void skipLineDelimiters(ByteBuf byteBuf) {
skipTillMatching(byteBuf, SKIP_LINE_DELIMITERS_AND_SPACES_PROCESSOR);
protected static boolean skipLineDelimiters(ByteBuf byteBuf) {
return skipTillMatching(byteBuf, SKIP_LINE_DELIMITERS_AND_SPACES_PROCESSOR);
}

protected static void skipColonAndWhiteSapaces(ByteBuf byteBuf) {
skipTillMatching(byteBuf, SKIP_COLON_AND_WHITE_SPACE_PROCESSOR);
protected static boolean skipColonAndWhiteSpaces(ByteBuf byteBuf) {
return skipTillMatching(byteBuf, SKIP_COLON_AND_WHITE_SPACE_PROCESSOR);
}

protected static void skipTillMatching(ByteBuf byteBuf, ByteBufProcessor processor) {
int lastIndexProcessed = byteBuf.forEachByte(processor);
private static boolean skipTillEOL(ByteBuf in) {
return skipTillMatching(in, SKIP_TILL_LINE_DELIMITER_PROCESSOR);
}

protected static boolean skipTillMatching(ByteBuf byteBuf, ByteBufProcessor processor) {
final int lastIndexProcessed = byteBuf.forEachByte(processor);
if (-1 == lastIndexProcessed) {
byteBuf.readerIndex(byteBuf.readerIndex() + byteBuf.readableBytes()); // If all the remaining bytes are to be ignored, discard the buffer.
} else {
byteBuf.readerIndex(lastIndexProcessed);
}

return -1 != lastIndexProcessed;
}

protected static boolean isLineDelimiter(char c) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
*/
package io.reactivex.netty.server;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/**
* @author Nitesh Kant
*/
class DefaultErrorHandler implements ErrorHandler {

private static final Logger logger = LoggerFactory.getLogger(DefaultErrorHandler.class);

@Override
public Observable<Void> handleError(Throwable throwable) {
System.err.println("Unexpected error in RxNetty. Error: " + throwable.getMessage());
throwable.printStackTrace(System.err);
logger.error("Unexpected error in RxNetty.", throwable);
return Observable.error(throwable); // If we do not return an error observable then the error is swallowed.
}
}
Loading

0 comments on commit 7ecdd35

Please sign in to comment.