Skip to content

Commit

Permalink
Add Publisher.onCompleteError (#2723)
Browse files Browse the repository at this point in the history
Motivation:
There are some scenarios where a stream terminating with
onComplete is not expected, and translating to an error
simplifies recovery.
  • Loading branch information
Scottmitch authored Oct 4, 2023
1 parent a726670 commit b576131
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import java.util.function.Supplier;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;

final class OnCompleteErrorPublisher<T> extends AbstractSynchronousPublisherOperator<T, T> {
private final Supplier<? extends Throwable> errorSupplier;

OnCompleteErrorPublisher(final Publisher<T> original, final Supplier<? extends Throwable> errorSupplier) {
super(original);
this.errorSupplier = requireNonNull(errorSupplier);
}

@Override
public Subscriber<? super T> apply(final Subscriber<? super T> subscriber) {
return new OnCompleteErrorSubscriber<>(subscriber, errorSupplier);
}

private static final class OnCompleteErrorSubscriber<T> implements Subscriber<T> {
private final Subscriber<? super T> subscriber;
private final Supplier<? extends Throwable> errorSupplier;

private OnCompleteErrorSubscriber(final Subscriber<? super T> subscriber,
final Supplier<? extends Throwable> errorSupplier) {
this.subscriber = subscriber;
this.errorSupplier = errorSupplier;
}

@Override
public void onSubscribe(final Subscription subscription) {
subscriber.onSubscribe(subscription);
}

@Override
public void onNext(@Nullable final T t) {
subscriber.onNext(t);
}

@Override
public void onError(final Throwable t) {
subscriber.onError(t);
}

@Override
public void onComplete() {
final Throwable cause;
try {
cause = errorSupplier.get();
} catch (Throwable cause2) {
subscriber.onError(cause2);
return;
}
if (cause == null) {
subscriber.onComplete();
} else {
subscriber.onError(cause);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,28 @@ public final <E extends Throwable> Publisher<T> onErrorReturn(
return onErrorReturn(type::isInstance, rawSupplier);
}

/**
* Transform this {@link Publisher}s {@link Subscriber#onComplete()} signal into
* {@link Subscriber#onError(Throwable)} signal (unless {@code null} error returned from {@code errorSupplier}).
* <p>
* This method provides a data transformation in sequential programming similar to:
* <pre>{@code
* List<T> results = resultOfThisPublisher();
* terminalOfThisPublisher();
* Throwable cause = errorSupplier.get()
* if (cause != null) {
* throw cause;
* }
* }</pre>
* @param errorSupplier returns the error to emit to {@link Subscriber#onError(Throwable)}. if the return value
* is {@code null} then complete with {@link Subscriber#onComplete()}.
* @return A {@link Publisher} which transform this {@link Publisher}s {@link Subscriber#onComplete()} signal into
* {@link Subscriber#onError(Throwable)} signal (unless {@code null} error returned from {@code errorSupplier}).
*/
public final Publisher<T> onCompleteError(final Supplier<? extends Throwable> errorSupplier) {
return new OnCompleteErrorPublisher<>(this, errorSupplier);
}

/**
* Transform errors emitted on this {@link Publisher} which match {@code predicate} into
* {@link Subscriber#onNext(Object)} then {@link Subscriber#onComplete()} signals (e.g. swallows the error).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.internal.DeliberateException;
import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static io.servicetalk.concurrent.api.Publisher.from;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;

final class OnCompleteErrorPublisherTest {
private final TestPublisherSubscriber<Integer> subscriber = new TestPublisherSubscriber<>();

@Test
void errorPassThrough() {
toSource(Publisher.<Integer>failed(DELIBERATE_EXCEPTION)
.onCompleteError(() -> new IllegalStateException("shouldn't get here"))
).subscribe(subscriber);
assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
}

@Test
void nullCompletes() {
toSource(Publisher.<Integer>empty()
.onCompleteError(() -> null)
).subscribe(subscriber);
subscriber.awaitOnComplete();
}

@ParameterizedTest(name = "{displayName} [{index}] shouldThrow={0}")
@ValueSource(booleans = {false, true})
void completeToError(boolean shouldThrow) {
final DeliberateException thrown = new DeliberateException();
toSource(from(1)
.onCompleteError(() -> {
if (shouldThrow) {
throw thrown;
}
return DELIBERATE_EXCEPTION;
})
).subscribe(subscriber);
subscriber.awaitSubscription().request(1);
assertThat(subscriber.takeOnNext(), equalTo(1));
assertThat(subscriber.awaitOnError(), shouldThrow ? sameInstance(thrown) : sameInstance(DELIBERATE_EXCEPTION));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.reactivestreams.tck;

import io.servicetalk.concurrent.api.Publisher;

import org.testng.annotations.Test;

@Test
public class PublisherOnCompleteErrorTckTest extends AbstractPublisherOperatorTckTest<Integer> {
@Override
protected Publisher<Integer> composePublisher(final Publisher<Integer> publisher, final int elements) {
return publisher.onCompleteError(() -> null);
}
}

0 comments on commit b576131

Please sign in to comment.