From 324483ccd49f996c664002837f1899f71ae40a2a Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Wed, 18 Mar 2015 15:55:22 -0700 Subject: [PATCH] UnicastContentSubject unsubscribes eagerly `UnicastContentSubject` delegates buffering to a `BufferUntilSubscriber` subject. However, it subscribes the downstream subscriber to this subject using `Observable.unsubscribe()`. This causes a `SafeSubscriber` subscribing to the buffer subject which unsubscribes on termination of the upstream source. The following code reproduces the issue: ```java UnicastContentSubject subject = UnicastContentSubject.createWithoutNoSubscriptionTimeout(); subject.onNext(1l); subject.onCompleted(); subject.flatMap(aLong -> Observable.never() .doOnUnsubscribe(() -> System.out.println("unsubscribed"))) .toBlocking().toFuture().get(); ``` In the above code the flatmap returns `Observable.never()` which should not get unsubscribed, unless the eventual subscriber unsubscribes. Using `unsafeSubscribe()` to subscribe to the `BufferUntilSubscriber` will not eagerly unsubscribe. --- .../protocol/http/UnicastContentSubject.java | 8 +--- .../http/UnicastContentSubjectTest.java | 48 ++++++++++++++++++- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java b/rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java index 31d4bd38..b7ad840b 100644 --- a/rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java +++ b/rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java @@ -156,11 +156,7 @@ private static final class State { private final Action0 onUnsubscribe; - public State() { - this(null); - } - - public State(Action0 onUnsubscribe) { + private State(Action0 onUnsubscribe) { this.onUnsubscribe = onUnsubscribe; final BufferUntilSubscriber bufferedSubject = BufferUntilSubscriber.create(); bufferedObservable = bufferedSubject.lift(new AutoReleaseByteBufOperator()); // Always auto-release @@ -223,7 +219,7 @@ public void call() { } })); - state.bufferedObservable.subscribe(subscriber); + state.bufferedObservable.unsafeSubscribe(subscriber); } else if(State.STATES.SUBSCRIBED.ordinal() == state.state) { subscriber.onError(new IllegalStateException("Content can only have one subscription. Use Observable.publish() if you want to multicast.")); diff --git a/rxnetty/src/test/java/io/reactivex/netty/protocol/http/UnicastContentSubjectTest.java b/rxnetty/src/test/java/io/reactivex/netty/protocol/http/UnicastContentSubjectTest.java index db9c6d57..c4f232c3 100644 --- a/rxnetty/src/test/java/io/reactivex/netty/protocol/http/UnicastContentSubjectTest.java +++ b/rxnetty/src/test/java/io/reactivex/netty/protocol/http/UnicastContentSubjectTest.java @@ -20,9 +20,11 @@ import io.netty.buffer.Unpooled; import org.junit.Assert; import org.junit.Test; +import rx.Observable; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Actions; +import rx.functions.Func1; import rx.observers.Subscribers; import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; @@ -30,7 +32,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -161,11 +165,53 @@ public void testByteBufReleaseWithTimeout() throws Exception { subject.onCompleted(); - testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); Assert.assertEquals("Byte buffer not fully released", 0, buffer.refCnt()); } + @Test + public void testUnsubscribe() throws Exception { + final TestScheduler testScheduler = Schedulers.test(); + UnicastContentSubject subject = UnicastContentSubject.createWithoutNoSubscriptionTimeout(); + subject.onNext(1l); + subject.onCompleted(); + + final AtomicBoolean outerUnsubscribe = new AtomicBoolean(); + final AtomicBoolean innerUnsubscribe = new AtomicBoolean(); + final AtomicBoolean sourceCompleted = new AtomicBoolean(); + + subject.doOnCompleted(new Action0() { + @Override + public void call() { + sourceCompleted.set(true); + } + }).flatMap(new Func1>() { + @Override + public Observable call(Long aLong) { + return Observable.interval(1, TimeUnit.SECONDS, testScheduler) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + innerUnsubscribe.set(true); + } + }); + } + }).take(1).doOnUnsubscribe(new Action0() { + @Override + public void call() { + outerUnsubscribe.set(true); + } + }).toBlocking().toFuture(); // This subscribes to the subject + + Assert.assertTrue("Source did not complete on subscription.", sourceCompleted.get()); + Assert.assertFalse("Inner flatmap got unsubscribed on source completion.", innerUnsubscribe.get()); + + testScheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + Assert.assertTrue("Outer subscriber did not unsubscribe on inner completion.", outerUnsubscribe.get()); + Assert.assertTrue("Inner subscriber did not unsubscribe on inner completion.", innerUnsubscribe.get()); + } + private static class OnUnsubscribeAction implements Action0 { private volatile boolean called;