Skip to content

Commit

Permalink
feat(*): Migrate from RxJava2 to Reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jan 29, 2024
1 parent 64f600e commit a65c702
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 23 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ dependencies {
compileOnly "io.micronaut:micronaut-inject"
compileOnly "io.micronaut.validation:micronaut-validation"
compileOnly "io.micronaut:micronaut-jackson-databind"
compileOnly "io.micronaut.rxjava2:micronaut-rxjava2"
compileOnly "io.micronaut.reactor:micronaut-reactor"

// kestra
compileOnly group: "io.kestra", name: "core", version: kestraVersion
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/kestra/plugin/mongodb/AbstractLoad.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.reactivex.Flowable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand All @@ -23,6 +22,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import jakarta.validation.constraints.NotNull;
import reactor.core.publisher.Flux;

@SuperBuilder
@ToString
Expand All @@ -44,7 +44,7 @@ public abstract class AbstractLoad extends AbstractTask implements RunnableTask<
@Builder.Default
private Integer chunk = 1000;

abstract protected Flowable<WriteModel<Bson>> source(RunContext runContext, BufferedReader inputStream);
abstract protected Flux<WriteModel<Bson>> source(RunContext runContext, BufferedReader inputStream) throws Exception;

@Override
public Output run(RunContext runContext) throws Exception {
Expand All @@ -63,7 +63,7 @@ public Output run(RunContext runContext) throws Exception {
AtomicInteger modifiedCount = new AtomicInteger();
AtomicInteger deletedCount = new AtomicInteger();

Flowable<BulkWriteResult> flowable = this.source(runContext, inputStream)
Flux<BulkWriteResult> flowable = this.source(runContext, inputStream)
.doOnNext(docWriteRequest -> {
count.incrementAndGet();
})
Expand All @@ -81,7 +81,7 @@ public Output run(RunContext runContext) throws Exception {
});

// metrics & finalize
Long requestCount = flowable.count().blockingGet();
Long requestCount = flowable.count().block();
runContext.metric(Counter.of(
"requests.count", requestCount,
"database", collection.getNamespace().getDatabaseName(),
Expand Down
25 changes: 14 additions & 11 deletions src/main/java/io/kestra/plugin/mongodb/Bulk.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.runners.RunContext;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableOnSubscribe;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -16,9 +13,15 @@
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.bson.conversions.Bson;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.io.BufferedReader;
import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;

import static io.kestra.core.utils.Rethrow.throwConsumer;

@SuperBuilder
@ToString
Expand All @@ -43,13 +46,13 @@
)
public class Bulk extends AbstractLoad {
@Override
protected Flowable<WriteModel<Bson>> source(RunContext runContext, BufferedReader inputStream) {
return Flowable
.create(this.ndJSonReader(inputStream), BackpressureStrategy.BUFFER);
protected Flux<WriteModel<Bson>> source(RunContext runContext, BufferedReader inputStream) throws IOException {
return Flux
.create(this.ndJSonReader(inputStream), FluxSink.OverflowStrategy.BUFFER);
}

public FlowableOnSubscribe<WriteModel<Bson>> ndJSonReader(BufferedReader input) {
return s -> {
public Consumer<FluxSink<WriteModel<Bson>>> ndJSonReader(BufferedReader input) throws IOException {
return throwConsumer(s -> {
String row;

while ((row = input.readLine()) != null) {
Expand Down Expand Up @@ -96,10 +99,10 @@ public FlowableOnSubscribe<WriteModel<Bson>> ndJSonReader(BufferedReader input)
throw new IllegalArgumentException("Invalid bulk request type on '" + row + "'");
}

s.onNext(docWriteRequest);
s.next(docWriteRequest);
}

s.onComplete();
};
s.complete();
});
}
}
16 changes: 9 additions & 7 deletions src/main/java/io/kestra/plugin/mongodb/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.JacksonMapper;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.bson.BsonDocument;
import org.bson.BsonObjectId;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.io.BufferedReader;
import java.util.Map;

import static io.kestra.core.utils.Rethrow.throwFunction;

@SuperBuilder
@ToString
@EqualsAndHashCode
Expand Down Expand Up @@ -58,10 +60,10 @@ public class Load extends AbstractLoad {

@SuppressWarnings("unchecked")
@Override
protected Flowable<WriteModel<Bson>> source(RunContext runContext, BufferedReader inputStream) {
return Flowable
.create(FileSerde.reader(inputStream), BackpressureStrategy.BUFFER)
.map(o -> {
protected Flux<WriteModel<Bson>> source(RunContext runContext, BufferedReader inputStream) throws Exception {
return Flux
.create(FileSerde.reader(inputStream), FluxSink.OverflowStrategy.BUFFER)
.map(throwFunction(o -> {
Map<String, Object> values = (Map<String, Object>) o;

if (this.idKey != null) {
Expand All @@ -80,6 +82,6 @@ protected Flowable<WriteModel<Bson>> source(RunContext runContext, BufferedReade
return new InsertOneModel<>(
BsonDocument.parse(JacksonMapper.ofJson().writeValueAsString(values))
);
});
}));
}
}

0 comments on commit a65c702

Please sign in to comment.