Skip to content

Commit

Permalink
Merge pull request #320 from NiteshKant/0.x
Browse files Browse the repository at this point in the history
Release RefCounted in UnicastContentSubject on dispose.
  • Loading branch information
NiteshKant committed Jan 28, 2015
2 parents 8ff93f6 + e9ae9a8 commit 85b5b7d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
Expand Down Expand Up @@ -138,7 +139,7 @@ public static <T> UnicastContentSubject<T> create(long noSubscriptionTimeout, Ti
*/
public boolean disposeIfNotSubscribed() {
if (state.casState(State.STATES.UNSUBSCRIBED, State.STATES.DISPOSED)) {
state.bufferedSubject.subscribe(Subscribers.empty()); // Drain all items so that ByteBuf gets released.
state.bufferedObservable.subscribe(Subscribers.empty()); // Drain all items so that ByteBuf gets released.

if (null != state.onUnsubscribe) {
state.onUnsubscribe.call(); // Since this is an inline/sync call, if this throws an error, it gets thrown to the caller.
Expand All @@ -165,6 +166,9 @@ public State() {

public State(Action0 onUnsubscribe) {
this.onUnsubscribe = onUnsubscribe;
final BufferUntilSubscriber<T> bufferedSubject = BufferUntilSubscriber.create();
bufferedObservable = bufferedSubject.lift(new AutoReleaseByteBufOperator<T>()); // Always auto-release
bufferedObserver = bufferedSubject;
}

/**
Expand All @@ -178,7 +182,8 @@ private enum STATES {

private volatile int state = STATES.UNSUBSCRIBED.ordinal(); /*Values are the ordinals of STATES enum*/

private final BufferUntilSubscriber<T> bufferedSubject = BufferUntilSubscriber.create();
private final Observer<T> bufferedObserver;
private final Observable<T> bufferedObservable;

@SuppressWarnings("unused")private volatile int timeoutScheduled; // Boolean

Expand Down Expand Up @@ -222,58 +227,58 @@ public void call() {
}
}));

state.bufferedSubject.lift(new AutoReleaseByteBufOperator()).subscribe(subscriber);
state.bufferedObservable.subscribe(subscriber);

} else if(State.STATES.SUBSCRIBED.ordinal() == state.state) {
subscriber.onError(new IllegalStateException("Content can only have one subscription. Use Observable.publish() if you want to multicast."));
} else if(State.STATES.DISPOSED.ordinal() == state.state) {
subscriber.onError(new IllegalStateException("Content stream is already disposed."));
}
}
}

private class AutoReleaseByteBufOperator implements Operator<T, T> {
@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
return new Subscriber<T>() {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
private static class AutoReleaseByteBufOperator<I> implements Operator<I, I> {
@Override
public Subscriber<? super I> call(final Subscriber<? super I> subscriber) {
return new Subscriber<I>() {
@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
try {
subscriber.onNext(t);
} finally {
ReferenceCountUtil.release(t);
}
@Override
public void onNext(I t) {
try {
subscriber.onNext(t);
} finally {
ReferenceCountUtil.release(t);
}
};
}
}
};
}
}

@Override
public void onCompleted() {
state.bufferedSubject.onCompleted();
state.bufferedObserver.onCompleted();
}

@Override
public void onError(Throwable e) {
state.bufferedSubject.onError(e);
state.bufferedObserver.onError(e);
}

@Override
public void onNext(T t) {
// Retain so that post-buffer, the ByteBuf does not get released.
// Release will be done after reading from the subject.
ReferenceCountUtil.retain(t);
state.bufferedSubject.onNext(t);
state.bufferedObserver.onNext(t);

// Schedule timeout once and when not subscribed yet.
if (state.casTimeoutScheduled() && state.state == State.STATES.UNSUBSCRIBED.ordinal()) {
Expand All @@ -288,6 +293,6 @@ public void call(Long aLong) {

@Override
public boolean hasObservers() {
return state.bufferedSubject.hasObservers();
return state.state == State.STATES.SUBSCRIBED.ordinal();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 Netflix, Inc.
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,7 +128,7 @@ public void call(String item) {
}

@Test
public void testByteBufRelease() throws Exception {
public void testByteBufReleaseWithNoTimeout() throws Exception {
UnicastContentSubject<ByteBuf> subject = UnicastContentSubject.createWithoutNoSubscriptionTimeout();
ByteBuf buffer = Unpooled.buffer();
Assert.assertEquals("Created byte buffer not retained.", 1, buffer.refCnt());
Expand All @@ -149,6 +149,24 @@ public void call(ByteBuf byteBuf) {
Assert.assertEquals("Byte buffer not released.", 0, last.refCnt());
}

@Test
public void testByteBufReleaseWithTimeout() throws Exception {
TestScheduler testScheduler = Schedulers.test();
UnicastContentSubject<ByteBuf> subject = UnicastContentSubject.create(100, TimeUnit.MILLISECONDS,
testScheduler);
ByteBuf buffer = Unpooled.buffer();

subject.onNext(buffer);
buffer.release(); // Simulatimg auto-release behavior in rxnetty today. (Issue: https://github.com/ReactiveX/RxNetty/issues/264)
Assert.assertEquals("Byte buffer not retained on buffering by subject.", 1, buffer.refCnt());

subject.onCompleted();


testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
Assert.assertEquals("Byte buffer not fully released", 0, buffer.refCnt());
}

private static class OnUnsubscribeAction implements Action0 {

private volatile boolean called;
Expand Down

0 comments on commit 85b5b7d

Please sign in to comment.