diff --git a/build.gradle b/build.gradle index 1e1c9ab..d5fb1bf 100644 --- a/build.gradle +++ b/build.gradle @@ -46,7 +46,7 @@ dependencies { compileOnly "io.micronaut:micronaut-inject" compileOnly "io.micronaut.validation:micronaut-validation" compileOnly "io.micronaut:micronaut-jackson-databind" - compileOnly "io.micronaut.rxjava2:micronaut-rxjava2" + compileOnly "io.micronaut.reactor:micronaut-reactor" // kestra compileOnly group: "io.kestra", name: "core", version: kestraVersion diff --git a/src/main/java/io/kestra/plugin/mongodb/AbstractLoad.java b/src/main/java/io/kestra/plugin/mongodb/AbstractLoad.java index bdce90a..4e15f83 100644 --- a/src/main/java/io/kestra/plugin/mongodb/AbstractLoad.java +++ b/src/main/java/io/kestra/plugin/mongodb/AbstractLoad.java @@ -8,7 +8,6 @@ import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; -import io.reactivex.Flowable; import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; import lombok.experimental.SuperBuilder; @@ -23,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import jakarta.validation.constraints.NotNull; +import reactor.core.publisher.Flux; @SuperBuilder @ToString @@ -44,7 +44,7 @@ public abstract class AbstractLoad extends AbstractTask implements RunnableTask< @Builder.Default private Integer chunk = 1000; - abstract protected Flowable> source(RunContext runContext, BufferedReader inputStream); + abstract protected Flux> source(RunContext runContext, BufferedReader inputStream) throws Exception; @Override public Output run(RunContext runContext) throws Exception { @@ -63,7 +63,7 @@ public Output run(RunContext runContext) throws Exception { AtomicInteger modifiedCount = new AtomicInteger(); AtomicInteger deletedCount = new AtomicInteger(); - Flowable flowable = this.source(runContext, inputStream) + Flux flowable = this.source(runContext, inputStream) .doOnNext(docWriteRequest -> { count.incrementAndGet(); }) @@ -81,7 +81,7 @@ public Output run(RunContext runContext) throws Exception { }); // metrics & finalize - Long requestCount = flowable.count().blockingGet(); + Long requestCount = flowable.count().block(); runContext.metric(Counter.of( "requests.count", requestCount, "database", collection.getNamespace().getDatabaseName(), diff --git a/src/main/java/io/kestra/plugin/mongodb/Bulk.java b/src/main/java/io/kestra/plugin/mongodb/Bulk.java index 4e847ed..5275810 100644 --- a/src/main/java/io/kestra/plugin/mongodb/Bulk.java +++ b/src/main/java/io/kestra/plugin/mongodb/Bulk.java @@ -4,9 +4,6 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.runners.RunContext; -import io.reactivex.BackpressureStrategy; -import io.reactivex.Flowable; -import io.reactivex.FlowableOnSubscribe; import io.swagger.v3.oas.annotations.media.Schema; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -16,9 +13,15 @@ import org.bson.BsonDocument; import org.bson.BsonValue; import org.bson.conversions.Bson; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import java.io.BufferedReader; +import java.io.IOException; import java.util.Map; +import java.util.function.Consumer; + +import static io.kestra.core.utils.Rethrow.throwConsumer; @SuperBuilder @ToString @@ -43,13 +46,13 @@ ) public class Bulk extends AbstractLoad { @Override - protected Flowable> source(RunContext runContext, BufferedReader inputStream) { - return Flowable - .create(this.ndJSonReader(inputStream), BackpressureStrategy.BUFFER); + protected Flux> source(RunContext runContext, BufferedReader inputStream) throws IOException { + return Flux + .create(this.ndJSonReader(inputStream), FluxSink.OverflowStrategy.BUFFER); } - public FlowableOnSubscribe> ndJSonReader(BufferedReader input) { - return s -> { + public Consumer>> ndJSonReader(BufferedReader input) throws IOException { + return throwConsumer(s -> { String row; while ((row = input.readLine()) != null) { @@ -96,10 +99,10 @@ public FlowableOnSubscribe> ndJSonReader(BufferedReader input) throw new IllegalArgumentException("Invalid bulk request type on '" + row + "'"); } - s.onNext(docWriteRequest); + s.next(docWriteRequest); } - s.onComplete(); - }; + s.complete(); + }); } } diff --git a/src/main/java/io/kestra/plugin/mongodb/Load.java b/src/main/java/io/kestra/plugin/mongodb/Load.java index 683ea6d..903417a 100644 --- a/src/main/java/io/kestra/plugin/mongodb/Load.java +++ b/src/main/java/io/kestra/plugin/mongodb/Load.java @@ -8,8 +8,6 @@ import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; import io.kestra.core.serializers.JacksonMapper; -import io.reactivex.BackpressureStrategy; -import io.reactivex.Flowable; import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; import lombok.experimental.SuperBuilder; @@ -17,10 +15,14 @@ import org.bson.BsonObjectId; import org.bson.conversions.Bson; import org.bson.types.ObjectId; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import java.io.BufferedReader; import java.util.Map; +import static io.kestra.core.utils.Rethrow.throwFunction; + @SuperBuilder @ToString @EqualsAndHashCode @@ -58,10 +60,10 @@ public class Load extends AbstractLoad { @SuppressWarnings("unchecked") @Override - protected Flowable> source(RunContext runContext, BufferedReader inputStream) { - return Flowable - .create(FileSerde.reader(inputStream), BackpressureStrategy.BUFFER) - .map(o -> { + protected Flux> source(RunContext runContext, BufferedReader inputStream) throws Exception { + return Flux + .create(FileSerde.reader(inputStream), FluxSink.OverflowStrategy.BUFFER) + .map(throwFunction(o -> { Map values = (Map) o; if (this.idKey != null) { @@ -80,6 +82,6 @@ protected Flowable> source(RunContext runContext, BufferedReade return new InsertOneModel<>( BsonDocument.parse(JacksonMapper.ofJson().writeValueAsString(values)) ); - }); + })); } }