Skip to content

Commit

Permalink
Merge pull request #262 from NiteshKant/0.x
Browse files Browse the repository at this point in the history
Fixes for 0.3.16
  • Loading branch information
NiteshKant committed Oct 29, 2014
2 parents b67db14 + 73719ce commit 01dd5df
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 35 deletions.
15 changes: 15 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# RxNetty Releases #

### Version 0.3.16 ###

[Milestone](https://github.com/ReactiveX/RxNetty/issues?q=milestone%3A0.3.16+is%3Aclosed)

* [Issue 258] (https://github.com/Netflix/RxNetty/issues/258) Include port in the HTTP HOST header.
* [Issue 259] (https://github.com/Netflix/RxNetty/issues/259) Support native netty protocol as a runtime choice.
* [Issue 260] (https://github.com/Netflix/RxNetty/issues/260) Convert `RxEventLoopProvider` and `MetricEventsListenerFactory` to abstract classes.
* [Issue 261] (https://github.com/Netflix/RxNetty/issues/261) `ServerSentEventEncoder` is sub-optimal in using StringBuilder

##### Incompatible changes

* [Issue 260] (https://github.com/Netflix/RxNetty/issues/260) Convert `RxEventLoopProvider` and `MetricEventsListenerFactory` to abstract classes.

Artifacts: [Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxnetty%22%20AND%20v%3A%220.3.15%22)

### Version 0.3.15 ###

[Milestone](https://github.com/ReactiveX/RxNetty/issues?q=milestone%3A0.3.15+is%3Aclosed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
/**
* @author Nitesh Kant
*/
public class ServoEventsListenerFactory implements MetricEventsListenerFactory {
public class ServoEventsListenerFactory extends MetricEventsListenerFactory {

private final String clientMetricNamePrefix;
private final String serverMetricNamePrefix;
Expand Down
1 change: 1 addition & 0 deletions rx-netty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ tasks.withType(Javadoc).each {

dependencies {
compile "io.netty:netty-codec-http:${netty_version}"
compile "io.netty:netty-transport-native-epoll:${netty_version}"
compile "io.reactivex:rxjava:${rxjava_version}"

testCompile 'junit:junit:4.10'
Expand Down
51 changes: 51 additions & 0 deletions rx-netty/src/main/java/io/reactivex/netty/RxNetty.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ public final class RxNetty {

private static final CompositeHttpClient<ByteBuf, ByteBuf> globalClient =
new CompositeHttpClientBuilder<ByteBuf, ByteBuf>().withMaxConnections(DEFAULT_MAX_CONNECTIONS).build();

private static MetricEventsListenerFactory metricEventsListenerFactory;

private static volatile boolean usingNativeTransport;

private RxNetty() {
}

Expand Down Expand Up @@ -256,6 +259,54 @@ public static RxEventLoopProvider getRxEventLoopProvider() {
return rxEventLoopProvider;
}

/**
* A global flag to start using netty's <a href="https://github.com/netty/netty/wiki/Native-transports">native protocol</a>
* if applicable for a client or server.
*
* <b>This does not evaluate whether the native transport is available for the OS or not.</b>
*
* So, this method should be called conditionally when the caller is sure that the OS supports the native protocol.
*
* Alternatively, this can be done selectively per client and server instance by doing the following:
*
* <h2>Http Server</h2>
<pre>
* {@code
RxNetty.newHttpServerBuilder(8888, new RequestHandler<Object, Object>() {
@Override
public Observable<Void> handle(HttpServerRequest<Object> request, HttpServerResponse<Object> response) {
return null;
}
}).channel(EpollServerSocketChannel.class)
.eventLoop(new EpollEventLoopGroup());
}
</pre>
*
* <h2>Http Client</h2>
*
<pre>
{@code
RxNetty.newHttpClientBuilder("localhost", 8888)
.channel(EpollSocketChannel.class)
.eventloop(new EpollEventLoopGroup());
}
</pre>
*/
public static void useNativeTransportIfApplicable() {
usingNativeTransport = true;
}

/**
* A global flag to disable the effects of calling {@link #useNativeTransportIfApplicable()}
*/
public static void disableNativeTransport() {
usingNativeTransport = false;
}

public static boolean isUsingNativeTransport() {
return usingNativeTransport;
}

private static RxClient.ServerInfo getServerInfoFromRequest(HttpClientRequest<ByteBuf> request)
throws URISyntaxException {
URI uri = new URI(request.getUri());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@
*
* @author Nitesh Kant
*/
public interface RxEventLoopProvider {
public abstract class RxEventLoopProvider {

/**
* The {@link EventLoopGroup} to be used by all {@link RxClient} instances if it is not explicitly provided using
* {@link ClientBuilder#eventloop(EventLoopGroup)}.
*
* @return The {@link EventLoopGroup} to be used for all clients.
*/
EventLoopGroup globalClientEventLoop();
public abstract EventLoopGroup globalClientEventLoop();

/**
* The {@link EventLoopGroup} to be used by all {@link RxServer} instances if it is not explicitly provided using
* {@link ServerBuilder#eventLoop(EventLoopGroup)} or {@link ServerBuilder#eventLoops(EventLoopGroup, EventLoopGroup)} .
*
* @return The {@link EventLoopGroup} to be used for all servers.
*/
EventLoopGroup globalServerEventLoop();
public abstract EventLoopGroup globalServerEventLoop();

/**
* The {@link EventLoopGroup} to be used by all {@link RxServer} instances as a parent eventloop group
Expand All @@ -54,5 +54,40 @@ public interface RxEventLoopProvider {
*
* @return The {@link EventLoopGroup} to be used for all servers.
*/
EventLoopGroup globalServerParentEventLoop();
public abstract EventLoopGroup globalServerParentEventLoop();

/**
* The {@link EventLoopGroup} to be used by all {@link RxClient} instances if it is not explicitly provided using
* {@link ClientBuilder#eventloop(EventLoopGroup)}.
*
* @param nativeTransport {@code true} If the eventloop for native transport is to be returned (if configured)
*
* @return The {@link EventLoopGroup} to be used for all client. If {@code nativeTransport} was {@code true} then
* return the {@link EventLoopGroup} for native transport.
*/
public abstract EventLoopGroup globalClientEventLoop(boolean nativeTransport);

/**
* The {@link EventLoopGroup} to be used by all {@link RxServer} instances if it is not explicitly provided using
* {@link ServerBuilder#eventLoop(EventLoopGroup)} or {@link ServerBuilder#eventLoops(EventLoopGroup, EventLoopGroup)} .
*
* @param nativeTransport {@code true} If the eventloop for native transport is to be returned (if configured)
*
* @return The {@link EventLoopGroup} to be used for all servers. If {@code nativeTransport} was {@code true} then
* return the {@link EventLoopGroup} for native transport. *
*/
public abstract EventLoopGroup globalServerEventLoop(boolean nativeTransport);

/**
* The {@link EventLoopGroup} to be used by all {@link RxServer} instances as a parent eventloop group
* (First argument to this method: {@link io.netty.bootstrap.ServerBootstrap#group(EventLoopGroup, EventLoopGroup)}),
* if it is not explicitly provided using {@link ServerBuilder#eventLoop(EventLoopGroup)} or
* {@link ServerBuilder#eventLoops(EventLoopGroup, EventLoopGroup)}.
*
* @param nativeTransport {@code true} If the eventloop for native transport is to be returned (if configured)
*
* @return The {@link EventLoopGroup} to be used for all servers. If {@code nativeTransport} was {@code true} then
* return the {@link EventLoopGroup} for native transport.
*/
public abstract EventLoopGroup globalServerParentEventLoop(boolean nativeTransport);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,29 @@
package io.reactivex.netty.channel;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.reactivex.netty.RxNetty;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* An implementation of {@link RxEventLoopProvider} that returns the same {@link EventLoopGroup} instance for both
* client and server.
*
* @author Nitesh Kant
*/
public class SingleNioLoopProvider implements RxEventLoopProvider {
public class SingleNioLoopProvider extends RxEventLoopProvider {

private final SharedNioEventLoopGroup eventLoop;
private final SharedNioEventLoopGroup parentEventLoop;
private final AtomicReference<EpollEventLoopGroup> nativeEventLoop;
private final AtomicReference<EpollEventLoopGroup> nativeParentEventLoop;
private final int parentEventLoopCount;
private final int childEventLoopCount;

public SingleNioLoopProvider() {
this(Runtime.getRuntime().availableProcessors());
Expand All @@ -41,11 +48,18 @@ public SingleNioLoopProvider() {
public SingleNioLoopProvider(int threadCount) {
eventLoop = new SharedNioEventLoopGroup(threadCount);
parentEventLoop = eventLoop;
parentEventLoopCount = childEventLoopCount = threadCount;
nativeEventLoop = new AtomicReference<EpollEventLoopGroup>();
nativeParentEventLoop = nativeEventLoop;
}

public SingleNioLoopProvider(int parentEventLoopCount, int childEventLoopCount) {
eventLoop = new SharedNioEventLoopGroup(childEventLoopCount);
this.parentEventLoopCount = parentEventLoopCount;
this.childEventLoopCount = childEventLoopCount;
parentEventLoop = new SharedNioEventLoopGroup(parentEventLoopCount);
eventLoop = new SharedNioEventLoopGroup(childEventLoopCount);
nativeParentEventLoop = new AtomicReference<EpollEventLoopGroup>();
nativeEventLoop = new AtomicReference<EpollEventLoopGroup>();
}

@Override
Expand All @@ -65,6 +79,56 @@ public EventLoopGroup globalServerParentEventLoop() {
return parentEventLoop;
}

@Override
public EventLoopGroup globalClientEventLoop(boolean nativeTransport) {
if (nativeTransport && RxNetty.isUsingNativeTransport()) {
return getNativeEventLoop();
}
return globalClientEventLoop();
}

@Override
public EventLoopGroup globalServerEventLoop(boolean nativeTransport) {
if (nativeTransport && RxNetty.isUsingNativeTransport()) {
return getNativeEventLoop();
}
return globalServerEventLoop();
}

@Override
public EventLoopGroup globalServerParentEventLoop(boolean nativeTransport) {
if (nativeTransport && RxNetty.isUsingNativeTransport()) {
return getNativeParentEventLoop();
}
return globalServerParentEventLoop();
}

private EpollEventLoopGroup getNativeParentEventLoop() {
if (nativeParentEventLoop == nativeEventLoop) { // Means using same event loop for acceptor and worker pool.
return getNativeEventLoop();
}

EpollEventLoopGroup eventLoopGroup = nativeParentEventLoop.get();
if (null == eventLoopGroup) {
EpollEventLoopGroup newEventLoopGroup = new EpollEventLoopGroup(parentEventLoopCount);
if (!nativeParentEventLoop.compareAndSet(null, newEventLoopGroup)) {
newEventLoopGroup.shutdownGracefully();
}
}
return nativeParentEventLoop.get();
}

private EpollEventLoopGroup getNativeEventLoop() {
EpollEventLoopGroup eventLoopGroup = nativeEventLoop.get();
if (null == eventLoopGroup) {
EpollEventLoopGroup newEventLoopGroup = new EpollEventLoopGroup(childEventLoopCount);
if (!nativeEventLoop.compareAndSet(null, newEventLoopGroup)) {
newEventLoopGroup.shutdownGracefully();
}
}
return nativeEventLoop.get();
}

public static class SharedNioEventLoopGroup extends NioEventLoopGroup {

private final AtomicInteger refCount = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
Expand Down Expand Up @@ -230,15 +231,15 @@ public MetricEventsSubject<ClientMetricsEvent<?>> getEventsSubject() {

public C build() {
if (null == socketChannel) {
socketChannel = NioSocketChannel.class;
socketChannel = defaultSocketChannelClass();
if (null == eventLoopGroup) {
eventLoopGroup = RxNetty.getRxEventLoopProvider().globalClientEventLoop();
eventLoopGroup = defaultEventloop(socketChannel);
}
}

if (null == eventLoopGroup) {
if (NioSocketChannel.class == socketChannel) {
eventLoopGroup = RxNetty.getRxEventLoopProvider().globalClientEventLoop();
if (defaultSocketChannelClass() == socketChannel) {
eventLoopGroup = defaultEventloop(socketChannel);
} else {
// Fail fast for defaults we do not support.
throw new IllegalStateException("Specified a channel class but not the event loop group.");
Expand All @@ -263,6 +264,14 @@ public C build() {
return client;
}

protected EventLoopGroup defaultEventloop(Class<? extends Channel> socketChannel) {
return RxNetty.getRxEventLoopProvider().globalClientEventLoop();
}

protected Class<? extends SocketChannel> defaultSocketChannelClass() {
return NioSocketChannel.class;
}

protected abstract C createClient();

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
package io.reactivex.netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.metrics.MetricEventsListener;
import io.reactivex.netty.metrics.MetricEventsListenerFactory;
Expand Down Expand Up @@ -64,6 +69,19 @@ protected String generatedNamePrefix() {
return "TcpClient-";
}

@Override
protected Class<? extends SocketChannel> defaultSocketChannelClass() {
if (RxNetty.isUsingNativeTransport()) {
return EpollSocketChannel.class;
}
return super.defaultSocketChannelClass();
}

@Override
protected EventLoopGroup defaultEventloop(Class<? extends Channel> socketChannel) {
return RxNetty.getRxEventLoopProvider().globalClientEventLoop(true); // get native eventloop if configured.
}

@Override
protected MetricEventsListener<? extends ClientMetricsEvent<ClientMetricsEvent.EventType>>
newMetricsListener(MetricEventsListenerFactory factory, RxClient<I, O> client) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,23 @@
*
* @author Nitesh Kant
*/
public interface MetricEventsListenerFactory {
public abstract class MetricEventsListenerFactory {

MetricEventsListener<ClientMetricsEvent<ClientMetricsEvent.EventType>> forTcpClient(
public abstract MetricEventsListener<ClientMetricsEvent<ClientMetricsEvent.EventType>> forTcpClient(
@SuppressWarnings("rawtypes") RxClient client);

MetricEventsListener<ClientMetricsEvent<?>> forHttpClient(@SuppressWarnings("rawtypes")HttpClient client);
public abstract MetricEventsListener<ClientMetricsEvent<?>> forHttpClient(@SuppressWarnings("rawtypes")HttpClient client);

MetricEventsListener<ClientMetricsEvent<?>> forWebSocketClient(@SuppressWarnings("rawtypes")WebSocketClient client);
public abstract MetricEventsListener<ClientMetricsEvent<?>> forWebSocketClient(@SuppressWarnings("rawtypes")WebSocketClient client);

MetricEventsListener<ClientMetricsEvent<?>> forUdpClient(@SuppressWarnings("rawtypes")UdpClient client);
public abstract MetricEventsListener<ClientMetricsEvent<?>> forUdpClient(@SuppressWarnings("rawtypes")UdpClient client);

MetricEventsListener<ServerMetricsEvent<ServerMetricsEvent.EventType>> forTcpServer(
public abstract MetricEventsListener<ServerMetricsEvent<ServerMetricsEvent.EventType>> forTcpServer(
@SuppressWarnings("rawtypes") RxServer server);

MetricEventsListener<ServerMetricsEvent<?>> forHttpServer(@SuppressWarnings("rawtypes")HttpServer server);
public abstract MetricEventsListener<ServerMetricsEvent<?>> forHttpServer(@SuppressWarnings("rawtypes")HttpServer server);

MetricEventsListener<ServerMetricsEvent<?>> forWebSocketServer(@SuppressWarnings("rawtypes")WebSocketServer server);
public abstract MetricEventsListener<ServerMetricsEvent<?>> forWebSocketServer(@SuppressWarnings("rawtypes")WebSocketServer server);

MetricEventsListener<ServerMetricsEvent<?>> forUdpServer(@SuppressWarnings("rawtypes")UdpServer server);
public abstract MetricEventsListener<ServerMetricsEvent<?>> forUdpServer(@SuppressWarnings("rawtypes")UdpServer server);
}
Loading

0 comments on commit 01dd5df

Please sign in to comment.