Skip to content

Commit

Permalink
Merge pull request #334 from daschl/unsubscribe-subj
Browse files Browse the repository at this point in the history
Unsubscribe from auto release timeout if a subscriber comes along.
  • Loading branch information
NiteshKant committed Feb 19, 2015
2 parents fa51fc0 + cf00ce3 commit 1dd4e45
Showing 1 changed file with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.internal.operators.BufferUntilSubscriber;
Expand Down Expand Up @@ -155,6 +156,7 @@ public void updateTimeoutIfNotScheduled(long noSubscriptionTimeout, TimeUnit tim
private static final class State<T> {

private final Action0 onUnsubscribe;
private volatile Subscription releaseSubscription;

public State() {
this(null);
Expand Down Expand Up @@ -200,6 +202,16 @@ public boolean casState(STATES expected, STATES next) {
public boolean casTimeoutScheduled() {
return TIMEOUT_SCHEDULED_UPDATER.compareAndSet(this, 0, 1);
}

public void setReleaseSubscription(final Subscription releaseSubscription) {
this.releaseSubscription = releaseSubscription;
}

public void unsubscribeReleaseSubscription() {
if(releaseSubscription != null) {
releaseSubscription.unsubscribe();
}
}
}

private static final class OnSubscribeAction<T> implements OnSubscribe<T> {
Expand All @@ -224,6 +236,7 @@ public void call() {
}));

state.bufferedObservable.subscribe(subscriber);
state.unsubscribeReleaseSubscription();

} else if(State.STATES.SUBSCRIBED.ordinal() == state.state) {
subscriber.onError(new IllegalStateException("Content can only have one subscription. Use Observable.publish() if you want to multicast."));
Expand Down Expand Up @@ -278,12 +291,13 @@ public void onNext(T t) {

// Schedule timeout once and when not subscribed yet.
if (state.casTimeoutScheduled() && state.state == State.STATES.UNSUBSCRIBED.ordinal()) {
timeoutScheduler.subscribe(new Action1<Long>() { // Schedule timeout after the first content arrives.
// Schedule timeout after the first content arrives.
state.setReleaseSubscription(timeoutScheduler.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
disposeIfNotSubscribed();
}
});
}));
}
}

Expand Down

0 comments on commit 1dd4e45

Please sign in to comment.