diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java index 1d7450ad49..e65e18da80 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java @@ -56,12 +56,12 @@ private abstract static class AbstractConcatSubscriber extends DelayedCancell */ static final Object INITIAL = new Object(); /** - * If {@link #cancel()} is called, the first call to request(n) is invalid, or the terminal signal was already - * delivered. + * If {@link #cancel()} is called after subscrubed to the next Publisher. */ static final Object CANCELLED = new Object(); /** - * Cancelled after {@link #onSuccess(Object)} or terminal signal received (prevents duplicate terminals). + * If {@link #cancel()} is called before subscribed to the next Publisher, the first call to request(n) is + * invalid, or terminal signal received (prevents duplicate terminals). */ static final Object TERMINAL = new Object(); /** @@ -90,6 +90,8 @@ private abstract static class AbstractConcatSubscriber extends DelayedCancell this.propagateCancel = propagateCancel; } + abstract boolean deferSubscribe(); + @Override public final void onSubscribe(final Cancellable cancellable) { delayedCancellable(cancellable); @@ -142,7 +144,7 @@ private void onErrorPropagateCancel(Throwable t) { @Override public final void onComplete() { - if (propagateCancel) { + if (propagateCancel || !deferSubscribe()) { onCompletePropagateCancel(); } else { target.onComplete(); @@ -207,7 +209,7 @@ final boolean tryEmitSingleSuccessToTarget(@Nullable final T result) { } } - private boolean finallyShouldSubscribeToNext(@Nullable Object oldState) { + private static boolean finallyShouldSubscribeToNext(@Nullable Object oldState) { return oldState != PUBLISHER_SUBSCRIBED; } @@ -242,6 +244,11 @@ private static final class ConcatSubscriber extends AbstractConcatSubscriber< super(target, next, propagateCancel); } + @Override + boolean deferSubscribe() { + return false; + } + @Override public void onSuccess(@Nullable final T result) { for (;;) { @@ -254,8 +261,12 @@ public void onSuccess(@Nullable final T result) { } break; } - } else if (oldValue == CANCELLED || oldValue == TERMINAL || - mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { + } else if (oldValue == TERMINAL) { + // This may happen only if returned Publisher was cancelled before Single terminates, subscribe to + // the next source to propagate cancellation. + next.subscribeInternal(this); + break; + } else if (oldValue == CANCELLED || mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { break; } } @@ -327,6 +338,11 @@ private static final class ConcatDeferNextSubscriber extends AbstractConcatSu super(target, next, propagateCancel); } + @Override + boolean deferSubscribe() { + return true; + } + @Override public void onSuccess(@Nullable final T result) { for (;;) { diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ConcatPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ConcatPublisherTest.java index 86492f81fd..99f6ae37b7 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ConcatPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ConcatPublisherTest.java @@ -21,8 +21,11 @@ import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; class ConcatPublisherTest { @@ -67,4 +70,23 @@ void testSecondEmitsError() { assertThat(subscriber.takeOnNext(2), contains("Hello1", "Hello2")); assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } + + @Test + void sourceCancel() { + Publisher p = first.concat(second); + toSource(p).subscribe(subscriber); + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); + subscriber.awaitSubscription().cancel(); + + TestSubscription firstSubscription = new TestSubscription(); + first.onSubscribe(firstSubscription); + assertThat("Source subscription not cancelled.", firstSubscription.isCancelled(), is(true)); + assertThat("Next source subscribed on cancellation.", second.isSubscribed(), is(false)); + first.onComplete(); + + TestSubscription secondSubscription = new TestSubscription(); + second.onSubscribe(secondSubscription); + assertThat("Next source not subscribed.", second.isSubscribed(), is(true)); + assertThat("Next cancellable not cancelled.", secondSubscription.isCancelled(), is(true)); + } } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithCompletableTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithCompletableTest.java index 8ed046be8f..223c25c500 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithCompletableTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithCompletableTest.java @@ -75,10 +75,16 @@ void testCancelSource() { toSource(source.concat(next)).subscribe(subscriber); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().cancel(); - TestCancellable cancellable = new TestCancellable(); - source.onSubscribe(cancellable); - assertTrue(cancellable.isCancelled()); - assertFalse(next.isSubscribed()); + TestCancellable sourceCancellable = new TestCancellable(); + source.onSubscribe(sourceCancellable); + assertTrue(sourceCancellable.isCancelled(), "Original completable not cancelled."); + assertFalse(next.isSubscribed(), "Next source subscribed unexpectedly."); + + source.onComplete(); + TestCancellable nextCancellable = new TestCancellable(); + next.onSubscribe(nextCancellable); + assertTrue(next.isSubscribed(), "Next source not subscribed."); + assertTrue(nextCancellable.isCancelled(), "Next source not cancelled."); } @Test diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithPublisherTest.java index de83733231..2e82112812 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithPublisherTest.java @@ -146,6 +146,7 @@ void cancelSource() { assertTrue(cancellable.isCancelled(), "Original completable not cancelled."); assertFalse(next.isSubscribed(), "Next source subscribed unexpectedly."); triggerNextSubscribe(); + assertTrue(next.isSubscribed(), "Next source not subscribed."); assertTrue(subscription.isCancelled(), "Next source not cancelled."); } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithSingleTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithSingleTest.java index 15ef15908e..4d9ff6ae04 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithSingleTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithSingleTest.java @@ -74,10 +74,16 @@ void testSourceError() { void testCancelSource() { assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().cancel(); - TestCancellable cancellable = new TestCancellable(); - source.onSubscribe(cancellable); - assertTrue(cancellable.isCancelled()); - assertFalse(next.isSubscribed()); + TestCancellable sourceCancellable = new TestCancellable(); + source.onSubscribe(sourceCancellable); + assertTrue(sourceCancellable.isCancelled(), "Original completable not cancelled."); + assertFalse(next.isSubscribed(), "Next source subscribed unexpectedly."); + + source.onComplete(); + TestCancellable nextCancellable = new TestCancellable(); + next.onSubscribe(nextCancellable); + assertTrue(next.isSubscribed(), "Next source not subscribed."); + assertTrue(nextCancellable.isCancelled(), "Next source not cancelled."); } @Test diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java index 67e77251fb..dd20bc9ce2 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java @@ -109,6 +109,7 @@ void nextError(boolean propagateCancel) { @CsvSource(value = {"false,false", "false,true", "true,false", "true,true"}) void sourceCancel(boolean propagateCancel, boolean onError) { setup(propagateCancel); + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().cancel(); assertThat("Source subscription not cancelled.", subscription.isCancelled(), is(true)); assertThat(completable.isSubscribed(), is(propagateCancel)); diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithSingleTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithSingleTest.java index 845f326ace..cd5d0622d7 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithSingleTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithSingleTest.java @@ -127,6 +127,7 @@ void nextError() { @Test void sourceCancel() { + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().cancel(); assertThat("Source subscription not cancelled.", subscription.isCancelled(), is(true)); assertThat("Next source subscribed on cancellation.", single.isSubscribed(), is(false)); diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithCompletableTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithCompletableTest.java index 0c18226011..bd6a112af6 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithCompletableTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithCompletableTest.java @@ -74,10 +74,16 @@ void testSourceError() { void testCancelSource() { assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().cancel(); - TestCancellable cancellable = new TestCancellable(); - source.onSubscribe(cancellable); - assertTrue(cancellable.isCancelled()); - assertFalse(next.isSubscribed()); + TestCancellable sourceCancellable = new TestCancellable(); + source.onSubscribe(sourceCancellable); + assertTrue(sourceCancellable.isCancelled(), "Original completable not cancelled."); + assertFalse(next.isSubscribed(), "Next source subscribed unexpectedly."); + + source.onSuccess(1); + TestCancellable nextCancellable = new TestCancellable(); + next.onSubscribe(nextCancellable); + assertTrue(next.isSubscribed(), "Next source not subscribed."); + assertTrue(nextCancellable.isCancelled(), "Next source not cancelled."); } @Test diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java index 1e339620bb..33658e5a65 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java @@ -112,10 +112,9 @@ private static Collection onNextErrorPropagatedParams() { return args; } - @ParameterizedTest(name = "mode={0} requestN={2} singleCompletesFirst={3}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0} requestN={1} singleCompletesFirst={2}") @MethodSource("onNextErrorPropagatedParams") - void onNextErrorPropagated(ConcatMode mode, long n, boolean singleCompletesFirst) - throws Exception { + void onNextErrorPropagated(ConcatMode mode, long n, boolean singleCompletesFirst) throws Exception { toSource((mode == PROPAGATE_CANCEL ? source.concatPropagateCancel(next) : source.concat(next, mode == DEFER_SUBSCRIBE)) @@ -141,7 +140,7 @@ void onNextErrorPropagated(ConcatMode mode, long n, boolean singleCompletesFirst } } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void bothCompletion(ConcatMode mode) { setUp(mode); @@ -155,7 +154,7 @@ void bothCompletion(ConcatMode mode) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void sourceCompletionNextError(ConcatMode mode) { setUp(mode); @@ -166,7 +165,7 @@ void sourceCompletionNextError(ConcatMode mode) { assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } - @ParameterizedTest(name = "mode={0} invalidRequestN={1}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0} invalidRequestN={1}") @MethodSource("invalidRequestN") void invalidRequestNBeforeNextSubscribe(ConcatMode mode, long invalidRequestN) { setUp(mode); @@ -175,7 +174,7 @@ void invalidRequestNBeforeNextSubscribe(ConcatMode mode, long invalidRequestN) { assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void invalidRequestNWithInlineSourceCompletion(ConcatMode mode) { toSource(mode == PROPAGATE_CANCEL ? @@ -185,7 +184,7 @@ void invalidRequestNWithInlineSourceCompletion(ConcatMode mode) { assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void invalidRequestAfterNextSubscribe(ConcatMode mode) { setUp(mode); @@ -194,7 +193,7 @@ void invalidRequestAfterNextSubscribe(ConcatMode mode) { assertThat("Invalid request-n not propagated.", subscription.requested(), is(lessThan(0L))); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void multipleInvalidRequest(ConcatMode mode) { setUp(mode); @@ -203,7 +202,7 @@ void multipleInvalidRequest(ConcatMode mode) { assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "mode={0} invalidRequestN={1}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0} invalidRequestN={1}") @MethodSource("invalidRequestN") void invalidThenValidRequest(ConcatMode mode, long invalidRequestN) { setUp(mode); @@ -213,7 +212,7 @@ void invalidThenValidRequest(ConcatMode mode, long invalidRequestN) { assertThat(cancellable.isCancelled(), is(true)); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void request0PropagatedAfterSuccess(ConcatMode mode) { setUp(mode); @@ -227,7 +226,7 @@ void request0PropagatedAfterSuccess(ConcatMode mode) { is(true)); } - @ParameterizedTest(name = "mode={0} error={1}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0} error={1}") @MethodSource("modeAndError") void sourceError(ConcatMode mode, boolean error) throws InterruptedException { setUp(mode); @@ -249,30 +248,25 @@ void sourceError(ConcatMode mode, boolean error) throws InterruptedException { } } - @ParameterizedTest(name = "mode={0} error={1}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0} error={1}") @MethodSource("modeAndError") void cancelSource(ConcatMode mode, boolean error) throws InterruptedException { setUp(mode); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); - Subscription subscription1 = subscriber.awaitSubscription(); - subscription1.request(2); - subscription1.cancel(); + subscriber.awaitSubscription().cancel(); assertThat("Original single not cancelled.", cancellable.isCancelled(), is(true)); - if (error) { - source.onError(DELIBERATE_EXCEPTION); - } else { - source.onSuccess(1); - } - if (mode == PROPAGATE_CANCEL) { - next.awaitSubscribed(); + assertThat("Next source not subscribed.", next.isSubscribed(), is(true)); next.onSubscribe(subscription); - subscription.awaitCancelled(); + assertThat("Next source not cancelled.", subscription.isCancelled(), is(true)); if (error) { + source.onError(DELIBERATE_EXCEPTION); next.onError(new DeliberateException()); } else { + subscriber.awaitSubscription().request(1); + source.onSuccess(1); next.onComplete(); } @@ -280,11 +274,28 @@ void cancelSource(ConcatMode mode, boolean error) throws InterruptedException { // thread safety on the subscriber and to avoid duplicate terminals. assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); } else { - assertThat(next.isSubscribed(), is(false)); + assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); + assertThat("Next source cancelled unexpectedly.", subscription.isCancelled(), is(false)); + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); if (error) { + source.onError(DELIBERATE_EXCEPTION); + assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } else { + subscriber.awaitSubscription().request(mode == DEFER_SUBSCRIBE ? 2 : 1); + source.onSuccess(1); + if (mode == CONCAT) { + assertThat("Next source not subscribed.", next.isSubscribed(), is(true)); + next.onSubscribe(subscription); + assertThat("Next source not cancelled.", subscription.isCancelled(), is(true)); + next.onComplete(); + } else { + assertThat("Next source not subscribed.", next.isSubscribed(), is(false)); + assertThat("Next source not cancelled.", subscription.isCancelled(), is(false)); + } + // Demand is not expected to propagate after cancel. + assertThat("Unexpected next items.", subscriber.pollAllOnNext(), Matchers.empty()); // It is not required that no terminal is delivered after cancel but verifies the current implementation // for thread safety on the subscriber and to avoid duplicate terminals. assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); @@ -292,7 +303,7 @@ void cancelSource(ConcatMode mode, boolean error) throws InterruptedException { } } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void cancelSourcePostRequest(ConcatMode mode) { setUp(mode); @@ -303,7 +314,7 @@ void cancelSourcePostRequest(ConcatMode mode) { assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(mode == PROPAGATE_CANCEL)); } - @ParameterizedTest(name = "mode={0} error={1}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0} error={1}") @MethodSource("modeAndError") void cancelNext(ConcatMode mode, boolean error) { setUp(mode); @@ -323,7 +334,7 @@ void cancelNext(ConcatMode mode, boolean error) { } } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void zeroIsNotRequestedOnTransitionToSubscription(ConcatMode mode) { setUp(mode); @@ -342,7 +353,7 @@ void zeroIsNotRequestedOnTransitionToSubscription(ConcatMode mode) { } } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void publisherSubscribeBlockDemandMakesProgress(ConcatMode mode) { source = new TestSingle<>(); @@ -369,7 +380,7 @@ void publisherSubscribeBlockDemandMakesProgress(ConcatMode mode) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void onErrorAfterInvalidRequestN(ConcatMode mode) { setUp(mode); @@ -390,7 +401,7 @@ void onErrorAfterInvalidRequestN(ConcatMode mode) { subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void singleCompletesWithNull(ConcatMode mode) { setUp(mode); @@ -403,7 +414,7 @@ void singleCompletesWithNull(ConcatMode mode) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void demandAccumulatedBeforeSingleCompletes(ConcatMode mode) { setUp(mode); @@ -423,7 +434,7 @@ void demandAccumulatedBeforeSingleCompletes(ConcatMode mode) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void requestOneThenMore(ConcatMode mode) { setUp(mode); @@ -441,7 +452,7 @@ void requestOneThenMore(ConcatMode mode) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void reentryWithMoreDemand(ConcatMode mode) { List emitted = new ArrayList<>(); @@ -480,7 +491,7 @@ public void onComplete() { assertThat(completed[0], is(true)); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void cancelledDuringFirstOnNext(ConcatMode mode) { List emitted = new ArrayList<>();