Skip to content

Commit

Permalink
Merge pull request #341 from NiteshKant/0.x
Browse files Browse the repository at this point in the history
UnicastContentSubject unsubscribes eagerly
  • Loading branch information
NiteshKant committed Mar 18, 2015
2 parents 1dd4e45 + 2a12eba commit 45d08ce
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,7 @@ private static final class State<T> {
private final Action0 onUnsubscribe;
private volatile Subscription releaseSubscription;

public State() {
this(null);
}

public State(Action0 onUnsubscribe) {
private State(Action0 onUnsubscribe) {
this.onUnsubscribe = onUnsubscribe;
final BufferUntilSubscriber<T> bufferedSubject = BufferUntilSubscriber.create();
bufferedObservable = bufferedSubject.lift(new AutoReleaseByteBufOperator<T>()); // Always auto-release
Expand Down Expand Up @@ -235,7 +231,7 @@ public void call() {
}
}));

state.bufferedObservable.subscribe(subscriber);
state.bufferedObservable.unsafeSubscribe(subscriber);
state.unsubscribeReleaseSubscription();

} else if(State.STATES.SUBSCRIBED.ordinal() == state.state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@
import io.netty.buffer.Unpooled;
import org.junit.Assert;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Actions;
import rx.functions.Func1;
import rx.observers.Subscribers;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -161,11 +165,53 @@ public void testByteBufReleaseWithTimeout() throws Exception {

subject.onCompleted();


testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
Assert.assertEquals("Byte buffer not fully released", 0, buffer.refCnt());
}

@Test
public void testUnsubscribe() throws Exception {
final TestScheduler testScheduler = Schedulers.test();
UnicastContentSubject<Long> subject = UnicastContentSubject.createWithoutNoSubscriptionTimeout();
subject.onNext(1l);
subject.onCompleted();

final AtomicBoolean outerUnsubscribe = new AtomicBoolean();
final AtomicBoolean innerUnsubscribe = new AtomicBoolean();
final AtomicBoolean sourceCompleted = new AtomicBoolean();

subject.doOnCompleted(new Action0() {
@Override
public void call() {
sourceCompleted.set(true);
}
}).flatMap(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.interval(1, TimeUnit.SECONDS, testScheduler)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
innerUnsubscribe.set(true);
}
});
}
}).take(1).doOnUnsubscribe(new Action0() {
@Override
public void call() {
outerUnsubscribe.set(true);
}
}).toBlocking().toFuture(); // This subscribes to the subject

Assert.assertTrue("Source did not complete on subscription.", sourceCompleted.get());
Assert.assertFalse("Inner flatmap got unsubscribed on source completion.", innerUnsubscribe.get());

testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);

Assert.assertTrue("Outer subscriber did not unsubscribe on inner completion.", outerUnsubscribe.get());
Assert.assertTrue("Inner subscriber did not unsubscribe on inner completion.", innerUnsubscribe.get());
}

private static class OnUnsubscribeAction implements Action0 {

private volatile boolean called;
Expand Down

0 comments on commit 45d08ce

Please sign in to comment.