From cf00ce39b3032fefc659c90d586e167e3f29239d Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Wed, 18 Feb 2015 13:33:29 +0100 Subject: [PATCH] Unsubscribe from auto release timeout if a subscriber comes along. If the first item gets emitted very quickly, the timer will basically always be fired. If the timeout is very long, then all those subscriptions will land in the old generation and very expensive to pick up. This changeset unsubscribes immediately if a subscriber comes along, making it more likely that the subscription is still in the young gen and quicker to clean up. --- .../protocol/http/UnicastContentSubject.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java b/rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java index 31d4bd38..ab9a4b0c 100644 --- a/rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java +++ b/rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java @@ -24,6 +24,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscriber; +import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; import rx.internal.operators.BufferUntilSubscriber; @@ -155,6 +156,7 @@ public void updateTimeoutIfNotScheduled(long noSubscriptionTimeout, TimeUnit tim private static final class State { private final Action0 onUnsubscribe; + private volatile Subscription releaseSubscription; public State() { this(null); @@ -200,6 +202,16 @@ public boolean casState(STATES expected, STATES next) { public boolean casTimeoutScheduled() { return TIMEOUT_SCHEDULED_UPDATER.compareAndSet(this, 0, 1); } + + public void setReleaseSubscription(final Subscription releaseSubscription) { + this.releaseSubscription = releaseSubscription; + } + + public void unsubscribeReleaseSubscription() { + if(releaseSubscription != null) { + releaseSubscription.unsubscribe(); + } + } } private static final class OnSubscribeAction implements OnSubscribe { @@ -224,6 +236,7 @@ public void call() { })); state.bufferedObservable.subscribe(subscriber); + state.unsubscribeReleaseSubscription(); } else if(State.STATES.SUBSCRIBED.ordinal() == state.state) { subscriber.onError(new IllegalStateException("Content can only have one subscription. Use Observable.publish() if you want to multicast.")); @@ -278,12 +291,13 @@ public void onNext(T t) { // Schedule timeout once and when not subscribed yet. if (state.casTimeoutScheduled() && state.state == State.STATES.UNSUBSCRIBED.ordinal()) { - timeoutScheduler.subscribe(new Action1() { // Schedule timeout after the first content arrives. + // Schedule timeout after the first content arrives. + state.setReleaseSubscription(timeoutScheduler.subscribe(new Action1() { @Override public void call(Long aLong) { disposeIfNotSubscribed(); } - }); + })); } }