diff --git a/reactfx/src/main/java/org/reactfx/AccumulateBetweenStream.java b/reactfx/src/main/java/org/reactfx/AccumulateBetweenStream.java index 5804178..5dc5e96 100644 --- a/reactfx/src/main/java/org/reactfx/AccumulateBetweenStream.java +++ b/reactfx/src/main/java/org/reactfx/AccumulateBetweenStream.java @@ -4,7 +4,7 @@ import java.util.function.BiFunction; import java.util.function.Function; -class AccumulateBetweenStream extends LazilyBoundStream { +class AccumulateBetweenStream extends EventStreamBase { private final EventStream source; private final EventStream ticks; private final Function initialTransformation; diff --git a/reactfx/src/main/java/org/reactfx/AccumulateUntilLaterStream.java b/reactfx/src/main/java/org/reactfx/AccumulateUntilLaterStream.java index 538e063..66c945a 100644 --- a/reactfx/src/main/java/org/reactfx/AccumulateUntilLaterStream.java +++ b/reactfx/src/main/java/org/reactfx/AccumulateUntilLaterStream.java @@ -5,7 +5,7 @@ import java.util.function.BiFunction; import java.util.function.Function; -class AccumulateUntilLaterStream extends LazilyBoundStream { +class AccumulateUntilLaterStream extends EventStreamBase { private final EventStream source; private final Function initialTransformation; private final BiFunction accumulation; diff --git a/reactfx/src/main/java/org/reactfx/AccumulatingStream.java b/reactfx/src/main/java/org/reactfx/AccumulatingStream.java index 5daf205..0207ac0 100644 --- a/reactfx/src/main/java/org/reactfx/AccumulatingStream.java +++ b/reactfx/src/main/java/org/reactfx/AccumulatingStream.java @@ -3,7 +3,7 @@ import java.util.function.BiFunction; import java.util.function.Function; -class AccumulatingStream extends LazilyBoundStream { +class AccumulatingStream extends EventStreamBase { private final EventStream input; private final Function initialTransformation; private final BiFunction reduction; diff --git a/reactfx/src/main/java/org/reactfx/Await.java b/reactfx/src/main/java/org/reactfx/Await.java index 50d23c6..f30bc75 100644 --- a/reactfx/src/main/java/org/reactfx/Await.java +++ b/reactfx/src/main/java/org/reactfx/Await.java @@ -14,7 +14,7 @@ import org.reactfx.util.TriConsumer; import org.reactfx.util.Try; -class Await extends LazilyBoundStream implements AwaitingEventStream { +class Await extends EventStreamBase implements AwaitingEventStream { public static AwaitingEventStream awaitCompletionStage( EventStream> source, @@ -61,7 +61,7 @@ static void addCompletionHandler( t.addEventHandler(WORKER_STATE_CANCELLED, e -> handler.accept(null, null, true)); } - static TriConsumer, T, Throwable> reportingEmitter() { + static TriConsumer, T, Throwable> reportingEmitter() { return (stream, value, error) -> { if(error == null) { stream.emit(value); @@ -71,7 +71,7 @@ static TriConsumer, T, Throwable> reportingEmitter() { }; } - static TriConsumer>, T, Throwable> tryEmitter() { + static TriConsumer>, T, Throwable> tryEmitter() { return (stream, value, error) -> { if(error == null) { stream.emit(Try.success(value)); @@ -84,12 +84,12 @@ static TriConsumer>, T, Throwable> tryEmitter() { private final EventStream source; private final Indicator pending = new Indicator(); private final BiConsumer> addCompletionHandler; - private final TriConsumer, T, Throwable> emitter; + private final TriConsumer, T, Throwable> emitter; private Await( EventStream source, BiConsumer> addCompletionHandler, - TriConsumer, T, Throwable> emitter) { + TriConsumer, T, Throwable> emitter) { this.source = source; this.addCompletionHandler = addCompletionHandler; this.emitter = emitter; @@ -120,7 +120,7 @@ protected final Subscription subscribeToInputs() { } -class AwaitLatest extends LazilyBoundStream implements AwaitingEventStream { +class AwaitLatest extends EventStreamBase implements AwaitingEventStream { public static AwaitingEventStream awaitCompletionStage( EventStream> source, @@ -214,7 +214,7 @@ public static AwaitingEventStream> tryAwaitTask( private final EventStream cancelImpulse; private final Consumer canceller; private final BiConsumer> addCompletionHandler; - private final TriConsumer, T, Throwable> emitter; + private final TriConsumer, T, Throwable> emitter; private long revision = 0; private F expectedFuture = null; @@ -226,7 +226,7 @@ private AwaitLatest( EventStream cancelImpulse, Consumer canceller, BiConsumer> addCompletionHandler, - TriConsumer, T, Throwable> emitter) { + TriConsumer, T, Throwable> emitter) { this.source = source; this.cancelImpulse = cancelImpulse; this.canceller = canceller; diff --git a/reactfx/src/main/java/org/reactfx/ConnectableEventSource.java b/reactfx/src/main/java/org/reactfx/ConnectableEventSource.java index 78de738..255728e 100644 --- a/reactfx/src/main/java/org/reactfx/ConnectableEventSource.java +++ b/reactfx/src/main/java/org/reactfx/ConnectableEventSource.java @@ -4,7 +4,7 @@ public final class ConnectableEventSource -extends LazilyBoundStream +extends EventStreamBase implements ConnectableEventStream, ConnectableEventSink { private MapHelper, Subscription> subscriptions = null; diff --git a/reactfx/src/main/java/org/reactfx/DistinctStream.java b/reactfx/src/main/java/org/reactfx/DistinctStream.java index 7358386..61a27cc 100644 --- a/reactfx/src/main/java/org/reactfx/DistinctStream.java +++ b/reactfx/src/main/java/org/reactfx/DistinctStream.java @@ -3,7 +3,7 @@ import java.util.Objects; -class DistinctStream extends LazilyBoundStream { +class DistinctStream extends EventStreamBase { static final Object NONE = new Object(); private final EventStream input; private Object previous = NONE; diff --git a/reactfx/src/main/java/org/reactfx/EmitBothOnEachStream.java b/reactfx/src/main/java/org/reactfx/EmitBothOnEachStream.java index 3b5d637..c500fe3 100644 --- a/reactfx/src/main/java/org/reactfx/EmitBothOnEachStream.java +++ b/reactfx/src/main/java/org/reactfx/EmitBothOnEachStream.java @@ -4,7 +4,7 @@ import org.reactfx.util.Tuple2; -class EmitBothOnEachStream extends LazilyBoundStream> { +class EmitBothOnEachStream extends EventStreamBase> { private final EventStream source; private final EventStream impulse; diff --git a/reactfx/src/main/java/org/reactfx/EmitOnEachStream.java b/reactfx/src/main/java/org/reactfx/EmitOnEachStream.java index ce8f6fe..a74c9d8 100644 --- a/reactfx/src/main/java/org/reactfx/EmitOnEachStream.java +++ b/reactfx/src/main/java/org/reactfx/EmitOnEachStream.java @@ -1,7 +1,7 @@ package org.reactfx; -class EmitOnEachStream extends LazilyBoundStream { +class EmitOnEachStream extends EventStreamBase { private final EventStream source; private final EventStream impulse; diff --git a/reactfx/src/main/java/org/reactfx/EmitOnStream.java b/reactfx/src/main/java/org/reactfx/EmitOnStream.java index 3daf369..e3a902a 100644 --- a/reactfx/src/main/java/org/reactfx/EmitOnStream.java +++ b/reactfx/src/main/java/org/reactfx/EmitOnStream.java @@ -1,7 +1,7 @@ package org.reactfx; -class EmitOnStream extends LazilyBoundStream { +class EmitOnStream extends EventStreamBase { private final EventStream source; private final EventStream impulse; diff --git a/reactfx/src/main/java/org/reactfx/EventSource.java b/reactfx/src/main/java/org/reactfx/EventSource.java index 5bd68a5..b1996cf 100644 --- a/reactfx/src/main/java/org/reactfx/EventSource.java +++ b/reactfx/src/main/java/org/reactfx/EventSource.java @@ -17,4 +17,9 @@ public class EventSource public final void push(T value) { emit(value); } + + @Override + protected final Subscription subscribeToInputs() { + return Subscription.EMPTY; + } } diff --git a/reactfx/src/main/java/org/reactfx/EventStream.java b/reactfx/src/main/java/org/reactfx/EventStream.java index b2d0238..f9bbbb7 100644 --- a/reactfx/src/main/java/org/reactfx/EventStream.java +++ b/reactfx/src/main/java/org/reactfx/EventStream.java @@ -278,7 +278,7 @@ default EventStream flatMap(Function> */ default EventStream> or(EventStream right) { EventStream left = this; - return new LazilyBoundStream>() { + return new EventStreamBase>() { @Override protected Subscription subscribeToInputs() { return Subscription.multi( @@ -1294,7 +1294,7 @@ default EventStream guardedBy(Guardian... guardians) { * thrown by its subscribers. */ default EventStream> materializeErrors() { - return new LazilyBoundStream>() { + return new EventStreamBase>() { @Override protected Subscription subscribeToInputs() { return EventStream.this.subscribe( @@ -1313,7 +1313,7 @@ protected Subscription subscribeToInputs() { * by its subscribers. */ default EventStream handleErrors(Consumer handler) { - return new LazilyBoundStream() { + return new EventStreamBase() { @Override protected Subscription subscribeToInputs() { return EventStream.this.subscribe(this::emit, handler); @@ -1325,7 +1325,7 @@ protected Subscription subscribeToInputs() { * Returns a stream of errors reported by this event stream. */ default EventStream errors() { - return new LazilyBoundStream() { + return new EventStreamBase() { @Override protected Subscription subscribeToInputs() { return EventStream.this.monitor(this::emit); diff --git a/reactfx/src/main/java/org/reactfx/EventStreamBase.java b/reactfx/src/main/java/org/reactfx/EventStreamBase.java index 6470588..875d683 100644 --- a/reactfx/src/main/java/org/reactfx/EventStreamBase.java +++ b/reactfx/src/main/java/org/reactfx/EventStreamBase.java @@ -1,9 +1,13 @@ package org.reactfx; +import java.util.function.Consumer; + /** + * Base class for event streams. + * Adds support for error propagation on top of {@link ObservableBase}. * - * @param type of events + * @param type of events emitted by this event stream. */ public abstract class EventStreamBase extends ObservableBase, T> @@ -56,6 +60,19 @@ protected final void reportError(Throwable thrown) { } } + /** + * Subscribes to the given event stream by the given subscriber and also + * forwards errors reported by the given stream to this stream. This is + * equivalent to {@code stream.subscribe(subscriber, this::reportError)}. + * @return subscription used to unsubscribe {@code subscriber} from + * {@code stream} and stop forwarding the errors. + */ + protected final Subscription subscribeTo( + EventStream stream, + Consumer subscriber) { + return stream.subscribe(subscriber, this::reportError); + } + @Override public final Subscription subscribe(Subscriber subscriber) { return observe(subscriber); diff --git a/reactfx/src/main/java/org/reactfx/EventStreams.java b/reactfx/src/main/java/org/reactfx/EventStreams.java index 350816c..a3b0515 100644 --- a/reactfx/src/main/java/org/reactfx/EventStreams.java +++ b/reactfx/src/main/java/org/reactfx/EventStreams.java @@ -57,7 +57,7 @@ public static EventStream never() { * of the given observable. */ public static EventStream invalidationsOf(Observable observable) { - return new LazilyBoundStream() { + return new EventStreamBase() { @Override protected Subscription subscribeToInputs() { InvalidationListener listener = obs -> emit(null); @@ -74,7 +74,7 @@ protected Subscription subscribeToInputs() { */ public static EventStream repeatOnInvalidation(O observable) { - return new LazilyBoundStream() { + return new EventStreamBase() { @Override protected Subscription subscribeToInputs() { InvalidationListener listener = obs -> emit(observable); @@ -95,7 +95,7 @@ protected void newObserver(Subscriber subscriber) { * every change. */ public static EventStream valuesOf(ObservableValue observable) { - return new LazilyBoundStream() { + return new EventStreamBase() { @Override protected Subscription subscribeToInputs() { ChangeListener listener = (obs, old, val) -> emit(val); @@ -111,7 +111,7 @@ protected void newObserver(Subscriber subscriber) { } public static EventStream nonNullValuesOf(ObservableValue observable) { - return new LazilyBoundStream() { + return new EventStreamBase() { @Override protected Subscription subscribeToInputs() { ChangeListener listener = (obs, old, val) -> { @@ -134,7 +134,7 @@ protected void newObserver(Subscriber subscriber) { } public static EventStream> changesOf(ObservableValue observable) { - return new LazilyBoundStream>() { + return new EventStreamBase>() { @Override protected Subscription subscribeToInputs() { ChangeListener listener = (obs, old, val) -> emit(new Change<>(old, val)); @@ -145,7 +145,7 @@ protected Subscription subscribeToInputs() { } public static EventStream> changesOf(ObservableList list) { - return new LazilyBoundStream>() { + return new EventStreamBase>() { @Override protected Subscription subscribeToInputs() { ListChangeListener listener = c -> emit(c); @@ -156,7 +156,7 @@ protected Subscription subscribeToInputs() { } public static EventStream> simpleChangesOf(ObservableList list) { - return new LazilyBoundStream>() { + return new EventStreamBase>() { @Override protected Subscription subscribeToInputs() { ListChangeListener listener = c -> { @@ -171,7 +171,7 @@ protected Subscription subscribeToInputs() { } public static EventStream> changesOf(ObservableSet set) { - return new LazilyBoundStream>() { + return new EventStreamBase>() { @Override protected Subscription subscribeToInputs() { SetChangeListener listener = c -> emit(c); @@ -182,7 +182,7 @@ protected Subscription subscribeToInputs() { } public static EventStream> changesOf(ObservableMap map) { - return new LazilyBoundStream>() { + return new EventStreamBase>() { @Override protected Subscription subscribeToInputs() { MapChangeListener listener = c -> emit(c); @@ -201,7 +201,7 @@ public static EventStream sizeOf(ObservableMap map) { } private static EventStream create(Supplier computeValue, Observable... dependencies) { - return new LazilyBoundStream() { + return new EventStreamBase() { private T previousValue; @Override @@ -233,7 +233,7 @@ protected void newObserver(Subscriber subscriber) { } public static EventStream eventsOf(Node node, EventType eventType) { - return new LazilyBoundStream() { + return new EventStreamBase() { @Override protected Subscription subscribeToInputs() { EventHandler handler = event -> emit(event); @@ -244,7 +244,7 @@ protected Subscription subscribeToInputs() { } public static EventStream eventsOf(Scene scene, EventType eventType) { - return new LazilyBoundStream() { + return new EventStreamBase() { @Override protected Subscription subscribeToInputs() { EventHandler handler = event -> emit(event); @@ -264,7 +264,7 @@ protected Subscription subscribeToInputs() { * returned stream. */ public static EventStream ticks(Duration interval) { - return new LazilyBoundStream() { + return new EventStreamBase() { private final Timer timer = FxTimer.createPeriodic( interval, () -> emit(null)); @@ -293,7 +293,7 @@ public static EventStream ticks( Duration interval, ScheduledExecutorService scheduler, Executor eventThreadExecutor) { - return new LazilyBoundStream() { + return new EventStreamBase() { private final Timer timer = ScheduledExecutorServiceTimer.createPeriodic( interval, () -> emit(null), scheduler, eventThreadExecutor); @@ -314,7 +314,7 @@ protected Subscription subscribeToInputs() { */ @SafeVarargs public static EventStream merge(EventStream... inputs) { - return new LazilyBoundStream() { + return new EventStreamBase() { @Override protected Subscription subscribeToInputs() { return Subscription.multi(i -> subscribeTo(i, this::emit), inputs); @@ -331,7 +331,7 @@ protected Subscription subscribeToInputs() { */ public static EventStream merge( ObservableSet> set) { - return new LazilyBoundStream() { + return new EventStreamBase() { @Override protected Subscription subscribeToInputs() { return Subscription.dynamic(set, s -> subscribeTo(s, this::emit)); @@ -349,7 +349,7 @@ protected Subscription subscribeToInputs() { public static EventStream merge( ObservableSet set, Function> f) { - return new LazilyBoundStream() { + return new EventStreamBase() { @Override protected Subscription subscribeToInputs() { return Subscription.dynamic( @@ -360,7 +360,7 @@ protected Subscription subscribeToInputs() { } public static EventStream> zip(EventStream srcA, EventStream srcB) { - return new LazilyBoundStream>() { + return new EventStreamBase>() { Pocket pocketA = new ExclusivePocket<>(); Pocket pocketB = new ExclusivePocket<>(); @@ -382,7 +382,7 @@ protected void tryEmit() { } public static EventStream> zip(EventStream srcA, EventStream srcB, EventStream srcC) { - return new LazilyBoundStream>() { + return new EventStreamBase>() { Pocket pocketA = new ExclusivePocket<>(); Pocket pocketB = new ExclusivePocket<>(); Pocket pocketC = new ExclusivePocket<>(); @@ -409,7 +409,7 @@ protected void tryEmit() { public static EventStream> combine( EventStream srcA, EventStream srcB) { - return new LazilyBoundStream>() { + return new EventStreamBase>() { Pocket pocketA = new Pocket<>(); Pocket pocketB = new Pocket<>(); @@ -434,7 +434,7 @@ public static EventStream> combine( EventStream srcA, EventStream srcB, EventStream srcC) { - return new LazilyBoundStream>() { + return new EventStreamBase>() { Pocket pocketA = new Pocket<>(); Pocket pocketB = new Pocket<>(); Pocket pocketC = new Pocket<>(); @@ -463,7 +463,7 @@ public static EventStream> combine( EventStream srcB, EventStream srcC, EventStream srcD) { - return new LazilyBoundStream>() { + return new EventStreamBase>() { Pocket pocketA = new Pocket<>(); Pocket pocketB = new Pocket<>(); Pocket pocketC = new Pocket<>(); @@ -499,7 +499,7 @@ public static EventStream> combine( EventStream srcC, EventStream srcD, EventStream srcE) { - return new LazilyBoundStream>() { + return new EventStreamBase>() { Pocket pocketA = new Pocket<>(); Pocket pocketB = new Pocket<>(); Pocket pocketC = new Pocket<>(); @@ -540,7 +540,7 @@ public static EventStream> combine( EventStream srcD, EventStream srcE, EventStream srcF) { - return new LazilyBoundStream>() { + return new EventStreamBase>() { Pocket pocketA = new Pocket<>(); Pocket pocketB = new Pocket<>(); Pocket pocketC = new Pocket<>(); diff --git a/reactfx/src/main/java/org/reactfx/FilterMapStream.java b/reactfx/src/main/java/org/reactfx/FilterMapStream.java index 90b4878..b01bbd8 100644 --- a/reactfx/src/main/java/org/reactfx/FilterMapStream.java +++ b/reactfx/src/main/java/org/reactfx/FilterMapStream.java @@ -3,7 +3,7 @@ import java.util.function.Function; import java.util.function.Predicate; -class FilterMapStream extends LazilyBoundStream { +class FilterMapStream extends EventStreamBase { private final EventStream source; private final Predicate predicate; private final Function f; diff --git a/reactfx/src/main/java/org/reactfx/FilterStream.java b/reactfx/src/main/java/org/reactfx/FilterStream.java index e089191..86aeca1 100644 --- a/reactfx/src/main/java/org/reactfx/FilterStream.java +++ b/reactfx/src/main/java/org/reactfx/FilterStream.java @@ -2,7 +2,7 @@ import java.util.function.Predicate; -class FilterStream extends LazilyBoundStream { +class FilterStream extends EventStreamBase { private final EventStream source; private final Predicate predicate; diff --git a/reactfx/src/main/java/org/reactfx/FlatMapStream.java b/reactfx/src/main/java/org/reactfx/FlatMapStream.java index 96f29c5..8d00bc6 100644 --- a/reactfx/src/main/java/org/reactfx/FlatMapStream.java +++ b/reactfx/src/main/java/org/reactfx/FlatMapStream.java @@ -3,7 +3,7 @@ import java.util.Optional; import java.util.function.Function; -class FlatMapStream extends LazilyBoundStream { +class FlatMapStream extends EventStreamBase { private final EventStream source; private final Function> mapper; @@ -30,7 +30,7 @@ protected Subscription subscribeToInputs() { } } -class FlatMapOptStream extends LazilyBoundStream { +class FlatMapOptStream extends EventStreamBase { private final EventStream source; private final Function> mapper; diff --git a/reactfx/src/main/java/org/reactfx/GuardedStream.java b/reactfx/src/main/java/org/reactfx/GuardedStream.java index 2998285..82eb955 100644 --- a/reactfx/src/main/java/org/reactfx/GuardedStream.java +++ b/reactfx/src/main/java/org/reactfx/GuardedStream.java @@ -1,7 +1,7 @@ package org.reactfx; -class GuardedStream extends LazilyBoundStream { +class GuardedStream extends EventStreamBase { private final EventStream source; private final Guardian guardian; diff --git a/reactfx/src/main/java/org/reactfx/LazilyBoundStream.java b/reactfx/src/main/java/org/reactfx/LazilyBoundStream.java deleted file mode 100644 index f4668ce..0000000 --- a/reactfx/src/main/java/org/reactfx/LazilyBoundStream.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.reactfx; - -import java.util.function.Consumer; - -/** - * Event stream that has one or more sources (most commonly event streams, - * but not necessarily) to which it is subscribed only when it itself has - * at least one subscriber. - * - * @param type of events emitted by this event stream. - */ -public abstract class LazilyBoundStream -extends EventStreamBase { - private Subscription subscription = null; - - public LazilyBoundStream() { - super(); - } - - LazilyBoundStream(EmptyPendingNotifications, T> pn) { - super(pn); - } - - protected abstract Subscription subscribeToInputs(); - - @Override - protected final void firstObserver() { - runUnsafeAction(() -> subscription = subscribeToInputs()); - } - - @Override - protected final void noObservers() { - runUnsafeAction(() -> { - subscription.unsubscribe(); - subscription = null; - }); - } - - protected final boolean isBound() { - return subscription != null; - } - - /** - * Subscribes to the given event stream by the given subscriber and also - * forwards errors reported by the given stream to this stream. This is - * equivalent to {@code stream.subscribe(subscriber, this::reportError)}. - * @return subscription used to unsubscribe {@code subscriber} from - * {@code stream} and stop forwarding the errors. - */ - protected final Subscription subscribeTo( - EventStream stream, - Consumer subscriber) { - return stream.subscribe(subscriber, this::reportError); - } -} \ No newline at end of file diff --git a/reactfx/src/main/java/org/reactfx/MappedStream.java b/reactfx/src/main/java/org/reactfx/MappedStream.java index f0b974d..a626a7a 100644 --- a/reactfx/src/main/java/org/reactfx/MappedStream.java +++ b/reactfx/src/main/java/org/reactfx/MappedStream.java @@ -5,7 +5,7 @@ import javafx.concurrent.Task; -class MappedStream extends LazilyBoundStream { +class MappedStream extends EventStreamBase { private final EventStream input; private final Function f; diff --git a/reactfx/src/main/java/org/reactfx/ObservableBase.java b/reactfx/src/main/java/org/reactfx/ObservableBase.java index 01be95d..97ee5dc 100644 --- a/reactfx/src/main/java/org/reactfx/ObservableBase.java +++ b/reactfx/src/main/java/org/reactfx/ObservableBase.java @@ -6,12 +6,22 @@ import org.reactfx.util.ListHelper; /** + * Base class for observable objects. This abstract class implements: + *
    + *
  1. Observer management: adding and removing observers.
  2. + *
  3. Lazy binding to inputs. An observable has 0 or more inputs, + * most commonly, but not necessarily, other observables. Lazy binding to + * inputs means that the observable observes its inputs only when it is + * itself being observed.
  4. + *
  5. Observer notification.
  6. + *
* * @param type of the observer * @param type of observed values */ abstract class ObservableBase { private ListHelper observers = null; + private Subscription subscription = null; private PendingNotifications pendingNotifications; ObservableBase(EmptyPendingNotifications pendingNotifications) { @@ -22,9 +32,16 @@ public ObservableBase() { this(EmptyNonRecursivePN.empty()); } - protected final int getObserverCount() { - return ListHelper.size(observers); - } + /** + * Starts observing this observable's input(s), if any. + * This method is called when the number of observers goes from 0 to 1. + * This method is called before {@link #newObserver(Object)} + * is called for the first observer. + * @return subscription used to stop observing inputs. The subscription + * is unsubscribed (i.e. input observation stops) when the number of + * observers goes down to 0. + */ + protected abstract Subscription subscribeToInputs(); /** * Runs the given action. If {@code action} does not throw an exception, @@ -35,6 +52,14 @@ protected final int getObserverCount() { */ protected abstract boolean runUnsafeAction(Runnable action); + protected final boolean isBound() { + return subscription != null; + } + + protected final int getObserverCount() { + return ListHelper.size(observers); + } + protected final void notifyObservers(BiConsumer notifier, T event) { try { boolean added = runUnsafeAction(() -> { @@ -65,44 +90,22 @@ protected final void forEachObserver(Consumer action) { ListHelper.forEach(observers, o -> action.accept(o)); } - /** - * Called when the number of observers goes from 0 to 1. - * Overriding this method is a convenient way for subclasses - * to handle this event. - * - *

This method is called before the - * {@link #newObserver(Object)} method.

- */ - protected void firstObserver() { - // default implementation is empty - } - /** * Called for each new observer. * Overriding this method is a convenient way for subclasses * to handle this event, for example to publish some initial events. * *

This method is called after the - * {@link #firstObserver()} method.

+ * {@link #subscribeToInputs()} method.

*/ protected void newObserver(O observer) { // default implementation is empty } - /** - * Called when the number of observers goes down to 0. - * Overriding this method is a convenient way for subclasses - * to handle this event. - */ - protected void noObservers() { - // default implementation is empty - } - protected final Subscription observe(O observer) { - observers = ListHelper.add(observers, observer); if(ListHelper.size(observers) == 1) { - firstObserver(); + runUnsafeAction(() -> subscription = subscribeToInputs()); } newObserver(observer); @@ -111,8 +114,11 @@ protected final Subscription observe(O observer) { private void unobserve(O observer) { observers = ListHelper.remove(observers, observer); - if(ListHelper.isEmpty(observers)) { - noObservers(); + if(ListHelper.isEmpty(observers) && subscription != null) { + runUnsafeAction(() -> { + subscription.unsubscribe(); + subscription = null; + }); } } } diff --git a/reactfx/src/main/java/org/reactfx/RecursiveStream.java b/reactfx/src/main/java/org/reactfx/RecursiveStream.java index 3442864..880565c 100644 --- a/reactfx/src/main/java/org/reactfx/RecursiveStream.java +++ b/reactfx/src/main/java/org/reactfx/RecursiveStream.java @@ -1,7 +1,7 @@ package org.reactfx; -class RecursiveStream extends LazilyBoundStream { +class RecursiveStream extends EventStreamBase { private final EventStream input; public RecursiveStream( diff --git a/reactfx/src/main/java/org/reactfx/RepeatOnStream.java b/reactfx/src/main/java/org/reactfx/RepeatOnStream.java index fb2ff58..342227f 100644 --- a/reactfx/src/main/java/org/reactfx/RepeatOnStream.java +++ b/reactfx/src/main/java/org/reactfx/RepeatOnStream.java @@ -1,7 +1,7 @@ package org.reactfx; -class RepeatOnStream extends LazilyBoundStream { +class RepeatOnStream extends EventStreamBase { private final EventStream source; private final EventStream impulse; diff --git a/reactfx/src/main/java/org/reactfx/SideEffectStream.java b/reactfx/src/main/java/org/reactfx/SideEffectStream.java index 68b540c..60cda99 100644 --- a/reactfx/src/main/java/org/reactfx/SideEffectStream.java +++ b/reactfx/src/main/java/org/reactfx/SideEffectStream.java @@ -2,7 +2,7 @@ import java.util.function.Consumer; -class SideEffectStream extends LazilyBoundStream { +class SideEffectStream extends EventStreamBase { private final EventStream source; private final Consumer sideEffect; private boolean sideEffectInProgress = false; diff --git a/reactfx/src/main/java/org/reactfx/StateMachine.java b/reactfx/src/main/java/org/reactfx/StateMachine.java index 49652e9..5d34a43 100644 --- a/reactfx/src/main/java/org/reactfx/StateMachine.java +++ b/reactfx/src/main/java/org/reactfx/StateMachine.java @@ -120,7 +120,7 @@ public Binding toObservableState() { } } -class StateStream extends LazilyBoundStream { +class StateStream extends EventStreamBase { private final InputHandler[] inputHandlers; private S state; @@ -173,7 +173,7 @@ public EventStream toEventStream() { } } -class StatefulStream extends LazilyBoundStream { +class StatefulStream extends EventStreamBase { private final List inputHandlers; private S state; diff --git a/reactfx/src/main/java/org/reactfx/SuccessionReducingStream.java b/reactfx/src/main/java/org/reactfx/SuccessionReducingStream.java index cd55688..b71b71c 100644 --- a/reactfx/src/main/java/org/reactfx/SuccessionReducingStream.java +++ b/reactfx/src/main/java/org/reactfx/SuccessionReducingStream.java @@ -8,7 +8,7 @@ import org.reactfx.util.Timer; -class SuccessionReducingStream extends LazilyBoundStream implements AwaitingEventStream { +class SuccessionReducingStream extends EventStreamBase implements AwaitingEventStream { private final EventStream input; private final Function initial; private final BiFunction reduction; diff --git a/reactfx/src/main/java/org/reactfx/SuspendWhenStream.java b/reactfx/src/main/java/org/reactfx/SuspendWhenStream.java index a4a56ab..03e1cf1 100644 --- a/reactfx/src/main/java/org/reactfx/SuspendWhenStream.java +++ b/reactfx/src/main/java/org/reactfx/SuspendWhenStream.java @@ -2,7 +2,7 @@ import javafx.beans.value.ObservableValue; -class SuspendWhenStream extends LazilyBoundStream { +class SuspendWhenStream extends EventStreamBase { private final SuspendableEventStream source; private final ObservableValue condition; diff --git a/reactfx/src/main/java/org/reactfx/SuspendableEventStream.java b/reactfx/src/main/java/org/reactfx/SuspendableEventStream.java index 9808194..5eaf077 100644 --- a/reactfx/src/main/java/org/reactfx/SuspendableEventStream.java +++ b/reactfx/src/main/java/org/reactfx/SuspendableEventStream.java @@ -32,7 +32,7 @@ default EventStream suspendWhen(ObservableValue condition) { } abstract class SuspendableEventStreamBase -extends LazilyBoundStream +extends EventStreamBase implements SuspendableEventStream { private final EventStream source; diff --git a/reactfx/src/main/java/org/reactfx/ThenAccumulateForStream.java b/reactfx/src/main/java/org/reactfx/ThenAccumulateForStream.java index 6876ddf..276e16c 100644 --- a/reactfx/src/main/java/org/reactfx/ThenAccumulateForStream.java +++ b/reactfx/src/main/java/org/reactfx/ThenAccumulateForStream.java @@ -10,7 +10,7 @@ import org.reactfx.util.Timer; -class ThenAccumulateForStream extends LazilyBoundStream implements AwaitingEventStream { +class ThenAccumulateForStream extends EventStreamBase implements AwaitingEventStream { private static enum State { READY, ACC_NO_EVENT, ACC_HAS_EVENT } diff --git a/reactfx/src/main/java/org/reactfx/ThreadBridge.java b/reactfx/src/main/java/org/reactfx/ThreadBridge.java index 69d00b1..1e6f46e 100644 --- a/reactfx/src/main/java/org/reactfx/ThreadBridge.java +++ b/reactfx/src/main/java/org/reactfx/ThreadBridge.java @@ -3,7 +3,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -class ThreadBridge extends LazilyBoundStream { +class ThreadBridge extends EventStreamBase { private final EventStream input; private final Executor sourceThreadExecutor; private final Executor targetThreadExecutor;