diff --git a/rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java b/rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java index a0db6bb5..fc1b27af 100644 --- a/rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java +++ b/rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java @@ -21,6 +21,7 @@ import io.reactivex.netty.protocol.http.client.HttpClientResponse; import io.reactivex.netty.protocol.http.server.HttpServerRequest; import rx.Observable; +import rx.Observer; import rx.Scheduler; import rx.Subscriber; import rx.functions.Action0; @@ -138,7 +139,7 @@ public static UnicastContentSubject create(long noSubscriptionTimeout, Ti */ public boolean disposeIfNotSubscribed() { if (state.casState(State.STATES.UNSUBSCRIBED, State.STATES.DISPOSED)) { - state.bufferedSubject.subscribe(Subscribers.empty()); // Drain all items so that ByteBuf gets released. + state.bufferedObservable.subscribe(Subscribers.empty()); // Drain all items so that ByteBuf gets released. if (null != state.onUnsubscribe) { state.onUnsubscribe.call(); // Since this is an inline/sync call, if this throws an error, it gets thrown to the caller. @@ -165,6 +166,9 @@ public State() { public State(Action0 onUnsubscribe) { this.onUnsubscribe = onUnsubscribe; + final BufferUntilSubscriber bufferedSubject = BufferUntilSubscriber.create(); + bufferedObservable = bufferedSubject.lift(new AutoReleaseByteBufOperator()); // Always auto-release + bufferedObserver = bufferedSubject; } /** @@ -178,7 +182,8 @@ private enum STATES { private volatile int state = STATES.UNSUBSCRIBED.ordinal(); /*Values are the ordinals of STATES enum*/ - private final BufferUntilSubscriber bufferedSubject = BufferUntilSubscriber.create(); + private final Observer bufferedObserver; + private final Observable bufferedObservable; @SuppressWarnings("unused")private volatile int timeoutScheduled; // Boolean @@ -222,7 +227,7 @@ public void call() { } })); - state.bufferedSubject.lift(new AutoReleaseByteBufOperator()).subscribe(subscriber); + state.bufferedObservable.subscribe(subscriber); } else if(State.STATES.SUBSCRIBED.ordinal() == state.state) { subscriber.onError(new IllegalStateException("Content can only have one subscription. Use Observable.publish() if you want to multicast.")); @@ -230,42 +235,42 @@ public void call() { subscriber.onError(new IllegalStateException("Content stream is already disposed.")); } } + } - private class AutoReleaseByteBufOperator implements Operator { - @Override - public Subscriber call(final Subscriber subscriber) { - return new Subscriber() { - @Override - public void onCompleted() { - subscriber.onCompleted(); - } + private static class AutoReleaseByteBufOperator implements Operator { + @Override + public Subscriber call(final Subscriber subscriber) { + return new Subscriber() { + @Override + public void onCompleted() { + subscriber.onCompleted(); + } - @Override - public void onError(Throwable e) { - subscriber.onError(e); - } + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } - @Override - public void onNext(T t) { - try { - subscriber.onNext(t); - } finally { - ReferenceCountUtil.release(t); - } + @Override + public void onNext(I t) { + try { + subscriber.onNext(t); + } finally { + ReferenceCountUtil.release(t); } - }; - } + } + }; } } @Override public void onCompleted() { - state.bufferedSubject.onCompleted(); + state.bufferedObserver.onCompleted(); } @Override public void onError(Throwable e) { - state.bufferedSubject.onError(e); + state.bufferedObserver.onError(e); } @Override @@ -273,7 +278,7 @@ public void onNext(T t) { // Retain so that post-buffer, the ByteBuf does not get released. // Release will be done after reading from the subject. ReferenceCountUtil.retain(t); - state.bufferedSubject.onNext(t); + state.bufferedObserver.onNext(t); // Schedule timeout once and when not subscribed yet. if (state.casTimeoutScheduled() && state.state == State.STATES.UNSUBSCRIBED.ordinal()) { @@ -288,6 +293,6 @@ public void call(Long aLong) { @Override public boolean hasObservers() { - return state.bufferedSubject.hasObservers(); + return state.state == State.STATES.SUBSCRIBED.ordinal(); } } diff --git a/rxnetty/src/test/java/io/reactivex/netty/protocol/http/UnicastContentSubjectTest.java b/rxnetty/src/test/java/io/reactivex/netty/protocol/http/UnicastContentSubjectTest.java index 99681b22..ae354def 100644 --- a/rxnetty/src/test/java/io/reactivex/netty/protocol/http/UnicastContentSubjectTest.java +++ b/rxnetty/src/test/java/io/reactivex/netty/protocol/http/UnicastContentSubjectTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2014 Netflix, Inc. + * Copyright 2015 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -128,7 +128,7 @@ public void call(String item) { } @Test - public void testByteBufRelease() throws Exception { + public void testByteBufReleaseWithNoTimeout() throws Exception { UnicastContentSubject subject = UnicastContentSubject.createWithoutNoSubscriptionTimeout(); ByteBuf buffer = Unpooled.buffer(); Assert.assertEquals("Created byte buffer not retained.", 1, buffer.refCnt()); @@ -149,6 +149,24 @@ public void call(ByteBuf byteBuf) { Assert.assertEquals("Byte buffer not released.", 0, last.refCnt()); } + @Test + public void testByteBufReleaseWithTimeout() throws Exception { + TestScheduler testScheduler = Schedulers.test(); + UnicastContentSubject subject = UnicastContentSubject.create(100, TimeUnit.MILLISECONDS, + testScheduler); + ByteBuf buffer = Unpooled.buffer(); + + subject.onNext(buffer); + buffer.release(); // Simulatimg auto-release behavior in rxnetty today. (Issue: https://github.com/ReactiveX/RxNetty/issues/264) + Assert.assertEquals("Byte buffer not retained on buffering by subject.", 1, buffer.refCnt()); + + subject.onCompleted(); + + + testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + Assert.assertEquals("Byte buffer not fully released", 0, buffer.refCnt()); + } + private static class OnUnsubscribeAction implements Action0 { private volatile boolean called;