Skip to content

Commit

Permalink
refactor: migrate to dynamic properties
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Dec 4, 2024
1 parent 70fe73c commit 47956bf
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 68 deletions.
12 changes: 6 additions & 6 deletions src/main/java/io/kestra/plugin/mongodb/AbstractLoad.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.mongodb.client.model.WriteModel;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
Expand Down Expand Up @@ -34,23 +35,21 @@ public abstract class AbstractLoad extends AbstractTask implements RunnableTask<
@Schema(
title = "The source file."
)
@PluginProperty(dynamic = true)
@NotNull
private String from;
private Property<String> from;

@Schema(
title = "Chunk size for every bulk request."
)
@PluginProperty(dynamic = true)
@Builder.Default
private Integer chunk = 1000;
private Property<Integer> chunk = Property.of(1000);

abstract protected Flux<WriteModel<Bson>> source(RunContext runContext, BufferedReader inputStream) throws Exception;

@Override
public Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
URI from = new URI(runContext.render(this.from));
URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow());

try (
MongoClient client = this.connection.client(runContext);
Expand All @@ -64,11 +63,12 @@ public Output run(RunContext runContext) throws Exception {
AtomicInteger modifiedCount = new AtomicInteger();
AtomicInteger deletedCount = new AtomicInteger();

var renderedChunk = runContext.render(this.chunk).as(Integer.class).orElse(null);
Flux<BulkWriteResult> flowable = this.source(runContext, inputStream)
.doOnNext(docWriteRequest -> {
count.incrementAndGet();
})
.buffer(this.chunk, this.chunk)
.buffer(renderedChunk, renderedChunk)
.map(indexRequests -> {
List<WriteModel<Bson>> bulkOperations = new ArrayList<>(indexRequests);

Expand Down
11 changes: 5 additions & 6 deletions src/main/java/io/kestra/plugin/mongodb/AbstractTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.mongodb.client.MongoDatabase;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -32,25 +33,23 @@ public abstract class AbstractTask extends Task {
@Schema(
title = "MongoDB database."
)
@PluginProperty(dynamic = true)
@NotNull
protected String database;
protected Property<String> database;

@Schema(
title = "MongoDB collection."
)
@PluginProperty(dynamic = true)
@NotNull
protected String collection;
protected Property<String> collection;

protected MongoCollection<Bson> collection(RunContext runContext, MongoClient client) throws IllegalVariableEvaluationException {
return this.collection(runContext, client, Bson.class);
}

protected <T> MongoCollection<T> collection(RunContext runContext, MongoClient client, Class<T> cls) throws IllegalVariableEvaluationException {
MongoDatabase database = client.getDatabase(runContext.render(this.database));
MongoDatabase database = client.getDatabase(runContext.render(this.database).as(String.class).orElseThrow());
return database.getCollection(
runContext.render(this.collection),
runContext.render(this.collection).as(String.class).orElseThrow(),
cls
);
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/kestra/plugin/mongodb/Delete.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -60,10 +61,9 @@ public class Delete extends AbstractTask implements RunnableTask<Delete.Output>
@Schema(
title = "Operation to use."
)
@PluginProperty(dynamic = false)
@Builder.Default
@NotNull
private Operation operation = Operation.DELETE_ONE;
private Property<Operation> operation = Property.of(Operation.DELETE_ONE);

@Override
public Delete.Output run(RunContext runContext) throws Exception {
Expand All @@ -75,7 +75,7 @@ public Delete.Output run(RunContext runContext) throws Exception {
BsonDocument bsonFilter = MongoDbService.toDocument(runContext, this.filter);

DeleteResult deleteResult;
if (this.operation == Operation.DELETE_ONE) {
if (Operation.DELETE_ONE.equals(runContext.render(this.operation).as(Operation.class).orElseThrow())) {
deleteResult = collection.deleteOne(bsonFilter);
} else {
deleteResult = collection.deleteMany(bsonFilter);
Expand Down
22 changes: 10 additions & 12 deletions src/main/java/io/kestra/plugin/mongodb/Find.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
Expand Down Expand Up @@ -43,7 +44,7 @@
code = """
id: mongodb_find
namespace: company.team
tasks:
- id: find
type: io.kestra.plugin.mongodb.Find
Expand Down Expand Up @@ -83,22 +84,19 @@ public class Find extends AbstractTask implements RunnableTask<Find.Output> {
@Schema(
title = "The number of records to return."
)
@PluginProperty(dynamic = true)
private Integer limit;
private Property<Integer> limit;

@Schema(
title = "The number of records to skip."
)
@PluginProperty(dynamic = true)
private Integer skip;
private Property<Integer> skip;


@Schema(
title = "Whether to store the data from the query result into an ion serialized data file."
)
@PluginProperty
@Builder.Default
private Boolean store = false;
private Property<Boolean> store = Property.of(false);

@Override
public Find.Output run(RunContext runContext) throws Exception {
Expand All @@ -120,17 +118,17 @@ public Find.Output run(RunContext runContext) throws Exception {
find.sort(MongoDbService.toDocument(runContext, this.sort));
}

if (this.limit != null) {
find.limit(this.limit);
if (runContext.render(this.limit).as(Integer.class).isPresent()) {
find.limit(runContext.render(this.limit).as(Integer.class).get());
}

if (this.skip != null) {
find.skip(this.skip);
if (runContext.render(this.skip).as(Integer.class).isPresent()) {
find.skip(runContext.render(this.skip).as(Integer.class).get());
}

Output.OutputBuilder builder = Output.builder();

if (this.store) {
if (runContext.render(this.store).as(Boolean.class).orElseThrow()) {
Pair<URI, Long> store = this.store(runContext, find);

builder
Expand Down
15 changes: 7 additions & 8 deletions src/main/java/io/kestra/plugin/mongodb/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.JacksonMapper;
Expand Down Expand Up @@ -41,7 +42,7 @@
inputs:
- id: file
type: FILE
tasks:
- id: load
type: io.kestra.plugin.mongodb.Load
Expand All @@ -58,15 +59,13 @@ public class Load extends AbstractLoad {
@Schema(
title = "Use this key as ID."
)
@PluginProperty(dynamic = true)
private String idKey;
private Property<String> idKey;

@Schema(
title = "Whether to remove idKey from the final document."
)
@PluginProperty(dynamic = true)
@Builder.Default
private Boolean removeIdKey = true;
private Property<Boolean> removeIdKey = Property.of(true);

@SuppressWarnings("unchecked")
@Override
Expand All @@ -75,15 +74,15 @@ protected Flux<WriteModel<Bson>> source(RunContext runContext, BufferedReader in
.map(throwFunction(o -> {
Map<String, Object> values = (Map<String, Object>) o;

if (this.idKey != null) {
String idKey = runContext.render(this.idKey);
if (runContext.render(this.idKey).as(String.class).isPresent()) {
String idKey = runContext.render(this.idKey).as(String.class).get();

values.put(
"_id",
new BsonObjectId(new ObjectId(values.get(idKey).toString()))
);

if (this.removeIdKey) {
if (runContext.render(this.removeIdKey).as(Boolean.class).orElseThrow()) {
values.remove(idKey);
}
}
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/io/kestra/plugin/mongodb/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.triggers.*;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -39,7 +40,7 @@
- id: return
type: io.kestra.plugin.core.debug.Return
format: "{{ json(taskrun.value) }}"
triggers:
- id: watch
type: io.kestra.plugin.mongodb.Trigger
Expand Down Expand Up @@ -68,22 +69,22 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,

private MongoDbConnection connection;

private String database;
private Property<String> database;

private String collection;
private Property<String> collection;

private Object filter;

private Object projection;

private Object sort;

private Integer limit;
private Property<Integer> limit;

private Integer skip;
private Property<Integer> skip;

@Builder.Default
private Boolean store = false;
private Property<Boolean> store = Property.of(false);

@Override
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/kestra/plugin/mongodb/Update.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -35,7 +36,7 @@
code = """
id: mongodb_update
namespace: company.team
tasks:
- id: update
type: io.kestra.plugin.mongodb.Update
Expand All @@ -60,7 +61,7 @@
code = """
id: mongodb_update
namespace: company.team
tasks:
- id: update
type: io.kestra.plugin.mongodb.Update
Expand Down Expand Up @@ -96,9 +97,8 @@ public class Update extends AbstractTask implements RunnableTask<Update.Output>
@Schema(
title = "Operation to use."
)
@PluginProperty(dynamic = false)
@Builder.Default
private Operation operation = Operation.UPDATE_ONE;
private Property<Operation> operation = Property.of(Operation.UPDATE_ONE);

@Override
public Update.Output run(RunContext runContext) throws Exception {
Expand All @@ -111,9 +111,9 @@ public Update.Output run(RunContext runContext) throws Exception {
BsonDocument bsonFilter = MongoDbService.toDocument(runContext, this.filter);

UpdateResult updateResult;
if (this.operation == Operation.REPLACE_ONE) {
if (Operation.REPLACE_ONE.equals(runContext.render(operation).as(Operation.class).orElseThrow())) {
updateResult = collection.replaceOne(bsonFilter, bsonDocument);
} else if (this.operation == Operation.UPDATE_ONE) {
} else if (Operation.UPDATE_ONE.equals(runContext.render(operation).as(Operation.class).orElseThrow())) {
updateResult = collection.updateOne(bsonFilter, bsonDocument);
} else {
updateResult = collection.updateMany(bsonFilter, bsonDocument);
Expand Down
9 changes: 5 additions & 4 deletions src/test/java/io/kestra/plugin/mongodb/BulkTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.mongodb;

import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StorageInterface;
Expand Down Expand Up @@ -55,10 +56,10 @@ void run() throws Exception {
.connection(MongoDbConnection.builder()
.uri("mongodb://root:example@localhost:27017/?authSource=admin")
.build())
.database(database)
.collection("bulk")
.from(uri.toString())
.chunk(10)
.database(Property.of(database))
.collection(Property.of("bulk"))
.from(Property.of(uri.toString()))
.chunk(Property.of(10))
.build();

Bulk.Output runOutput = put.run(runContext);
Expand Down
Loading

0 comments on commit 47956bf

Please sign in to comment.