Skip to content

Commit

Permalink
refactor: migrate to dynamic properties
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Dec 4, 2024
1 parent 142b258 commit 26d2dcf
Show file tree
Hide file tree
Showing 17 changed files with 123 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
Expand Down Expand Up @@ -46,7 +47,7 @@ public abstract class AbstractLoad extends AbstractTask implements RunnableTask<
)
@PluginProperty(dynamic = true)
@Builder.Default
private Integer chunk = 1000;
private Property<Integer> chunk = Property.of(1000);

abstract protected Flux<BulkOperation> source(RunContext runContext, BufferedReader inputStream) throws IllegalVariableEvaluationException, IOException;

Expand All @@ -67,7 +68,7 @@ public AbstractLoad.Output run(RunContext runContext) throws Exception {
.doOnNext(docWriteRequest -> {
count.incrementAndGet();
})
.buffer(this.chunk, this.chunk)
.buffer(runContext.render(this.chunk).as(Integer.class).orElseThrow(), runContext.render(this.chunk).as(Integer.class).orElseThrow())
.map(throwFunction(indexRequests -> {
var bulkRequest = new BulkRequest.Builder();
bulkRequest.operations(indexRequests);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import co.elastic.clients.elasticsearch.core.SearchRequest;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.elasticsearch.model.XContentType;
import io.swagger.v3.oas.annotations.media.Schema;
Expand All @@ -24,8 +25,7 @@ public abstract class AbstractSearch extends AbstractTask {
title = "The ElasticSearch indices.",
description = "Default to all indices."
)
@PluginProperty(dynamic = true)
private List<String> indexes;
private Property<List<String>> indexes;

@Schema(
title = "The ElasticSearch value.",
Expand All @@ -49,8 +49,8 @@ protected SearchRequest.Builder request(RunContext runContext) throws IllegalVar

request = QueryService.request(runContext, this.request);

if (this.indexes != null) {
request.index(runContext.render(this.indexes));
if (!runContext.render(this.indexes).asList(String.class).isEmpty()) {
request.index(runContext.render(this.indexes).asList(String.class));
}

if (this.routing != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.elasticsearch;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
Expand All @@ -24,6 +25,5 @@ public abstract class AbstractTask extends Task {
title = "Controls the shard routing of the request.",
description = "Using this value to hash the shard and not the id."
)
@PluginProperty(dynamic = true)
protected String routing;
protected Property<String> routing;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -57,8 +58,7 @@ public class ElasticsearchConnection {
title = "List of HTTP headers to be send on every request.",
description = "Must be a string with key value separated with `:`, ex: `Authorization: Token XYZ`."
)
@PluginProperty(dynamic = true)
private List<String> headers;
private Property<List<String>> headers;

@Schema(
title = "Sets the path's prefix for every request used by the HTTP client.",
Expand All @@ -68,21 +68,18 @@ public class ElasticsearchConnection {
"or a proxy that requires all paths to start with '/'; it is not intended for other purposes and " +
"it should not be supplied in other scenarios."
)
@PluginProperty(dynamic = true)
private String pathPrefix;
private Property<String> pathPrefix;

@Schema(
title = "Whether the REST client should return any response containing at least one warning header as a failure."
)
@PluginProperty
private Boolean strictDeprecationMode;
private Property<Boolean> strictDeprecationMode;

@Schema(
title = "Trust all SSL CA certificates.",
description = "Use this if the server is using a self signed SSL certificate."
)
@PluginProperty
private Boolean trustAllSsl;
private Property<Boolean> trustAllSsl;

@SuperBuilder
@NoArgsConstructor
Expand All @@ -91,14 +88,12 @@ public static class BasicAuth {
@Schema(
title = "Basic auth username."
)
@PluginProperty(dynamic = true)
private String username;
private Property<String> username;

@Schema(
title = "Basic auth password."
)
@PluginProperty(dynamic = true)
private String password;
private Property<String> password;
}

RestClientTransport client(RunContext runContext) throws IllegalVariableEvaluationException {
Expand All @@ -113,12 +108,12 @@ RestClientTransport client(RunContext runContext) throws IllegalVariableEvaluati
builder.setDefaultHeaders(this.defaultHeaders(runContext));
}

if (this.getPathPrefix() != null) {
builder.setPathPrefix(runContext.render(this.pathPrefix));
if (runContext.render(this.pathPrefix).as(String.class).isPresent()) {
builder.setPathPrefix(runContext.render(this.pathPrefix).as(String.class).get());
}

if (this.getStrictDeprecationMode() != null) {
builder.setStrictDeprecationMode(this.getStrictDeprecationMode());
if (runContext.render(this.strictDeprecationMode).as(Boolean.class).isPresent()) {
builder.setStrictDeprecationMode(runContext.render(this.strictDeprecationMode).as(Boolean.class).get());
}

return new RestClientTransport(builder.build(), new JacksonJsonpMapper(MAPPER));
Expand All @@ -135,15 +130,15 @@ private HttpAsyncClientBuilder httpAsyncClientBuilder(RunContext runContext) {
basicCredential.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(
runContext.render(this.basicAuth.username),
runContext.render(this.basicAuth.password)
runContext.render(this.basicAuth.username).as(String.class).orElseThrow(),
runContext.render(this.basicAuth.password).as(String.class).orElseThrow()
)
);

builder.setDefaultCredentialsProvider(basicCredential);
}

if (trustAllSsl != null && trustAllSsl) {
if (runContext.render(this.trustAllSsl).as(Boolean.class).orElse(false)) {
SSLContextBuilder sslContextBuilder = new SSLContextBuilder();
sslContextBuilder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true);
SSLContext sslContext = sslContextBuilder.build();
Expand All @@ -166,7 +161,7 @@ private HttpHost[] httpHosts(RunContext runContext) throws IllegalVariableEvalua
}

private Header[] defaultHeaders(RunContext runContext) throws IllegalVariableEvaluationException {
return runContext.render(this.headers)
return runContext.render(this.headers).asList(String.class)
.stream()
.map(header -> {
String[] nameAndValue = header.split(":");
Expand Down
14 changes: 6 additions & 8 deletions src/main/java/io/kestra/plugin/elasticsearch/Esql.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.kestra.plugin.elasticsearch;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._helpers.esql.objects.ObjectsEsqlAdapter;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.esql.QueryRequest;
import co.elastic.clients.elasticsearch.esql.query.EsqlFormat;
Expand All @@ -12,6 +11,7 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.common.FetchType;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -61,7 +61,7 @@
- id: load
type: io.kestra.plugin.elasticsearch.Bulk
from: "{{ outputs.extract.uri }}"
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: PT5S
Expand Down Expand Up @@ -99,17 +99,15 @@ public class Esql extends AbstractTask implements RunnableTask<Esql.Output> {
+ "NONE do nothing."
)
@Builder.Default
@PluginProperty
@NotNull
private FetchType fetchType = FetchType.FETCH;
private Property<FetchType> fetchType = Property.of(FetchType.FETCH);

@Schema(
title = "The ElasticSearch value.",
description = "Can be a JSON string. In this case, the contentType will be used or a raw Map."
)
@PluginProperty(dynamic = true)
@NotNull
private String query;
private Property<String> query;

@Schema(
title = "Query filter.",
Expand All @@ -127,7 +125,7 @@ public Esql.Output run(RunContext runContext) throws Exception {

// build request
QueryRequest queryRequest = QueryRequest.of(throwFunction(builder -> {
builder.query(runContext.render(this.query));
builder.query(runContext.render(this.query).as(String.class).orElseThrow());
builder.format(EsqlFormat.Json);
builder.columnar(false);

Expand All @@ -148,7 +146,7 @@ public Esql.Output run(RunContext runContext) throws Exception {

Output.OutputBuilder outputBuilder = Esql.Output.builder();

switch (fetchType) {
switch (runContext.render(this.fetchType).as(FetchType.class).orElseThrow()) {
case FETCH:
Pair<List<Map<String, Object>>, Integer> fetch = this.fetch(queryResponse);
outputBuilder
Expand Down
25 changes: 9 additions & 16 deletions src/main/java/io/kestra/plugin/elasticsearch/Get.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -37,7 +38,7 @@
- id: get
type: io.kestra.plugin.elasticsearch.Get
connection:
hosts:
hosts:
- "http://localhost:9200"
index: "my_index"
key: "my_id"
Expand All @@ -49,47 +50,39 @@ public class Get extends AbstractTask implements RunnableTask<Get.Output> {
@Schema(
title = "The ElasticSearch index."
)
@PluginProperty(dynamic = true)
@NotNull
private String index;
private Property<String> index;

@Schema(
title = "The ElasticSearch id."
)
@PluginProperty(dynamic = true)
@NotNull
private String key;
private Property<String> key;

@Schema(
title = "Sets the version",
description = "which will cause the get operation to only be performed if a matching version exists and no changes happened on the doc since then."
)
@PluginProperty
@NotNull
private Long version;

private Property<Long> version;

@Override
public Get.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
try (RestClientTransport transport = this.connection.client(runContext)) {
ElasticsearchClient client = new ElasticsearchClient(transport);
String index = runContext.render(this.index);
String key = runContext.render(this.key);
String index = runContext.render(this.index).as(String.class).orElseThrow();
String key = runContext.render(this.key).as(String.class).orElseThrow();

var request = new GetRequest.Builder();
request.index(index).id(key);

if (this.version != null) {
request.version(this.version);
}

if (this.routing != null) {
request.routing(this.routing);
request.version(runContext.render(this.version).as(Long.class).orElseThrow());
}

if (this.routing != null) {
request.routing(this.routing);
request.routing(runContext.render(this.routing).as(String.class).orElseThrow());
}

GetResponse<Map> response = client.get(request.build(), Map.class);
Expand Down
23 changes: 10 additions & 13 deletions src/main/java/io/kestra/plugin/elasticsearch/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
Expand Down Expand Up @@ -37,7 +38,7 @@
code = """
id: elasticsearch_load
namespace: company.team
inputs:
- id: file
type: FILE
Expand All @@ -46,7 +47,7 @@
- id: load
type: io.kestra.plugin.elasticsearch.Load
connection:
hosts:
hosts:
- "http://localhost:9200"
from: "{{ inputs.file }}"
index: "my_index"
Expand All @@ -59,28 +60,24 @@ public class Load extends AbstractLoad implements RunnableTask<Load.Output> {
@Schema(
title = "The elasticsearch index."
)
@PluginProperty(dynamic = true)
@NotNull
private String index;
private Property<String> index;

@Schema(
title = "Sets the type of operation to perform."
)
@PluginProperty
private OpType opType;
private Property<OpType> opType;

@Schema(
title = "Use this key as id."
)
@PluginProperty(dynamic = true)
private String idKey;
private Property<String> idKey;

@Schema(
title = "Remove idKey from the final document."
)
@PluginProperty(dynamic = true)
@Builder.Default
private Boolean removeIdKey = true;
private Property<Boolean> removeIdKey = Property.of(true);

@SuppressWarnings("unchecked")
@Override
Expand All @@ -91,7 +88,7 @@ protected Flux<BulkOperation> source(RunContext runContext, BufferedReader input

var indexRequest = new IndexOperation.Builder<Map<String, ?>>();
if (this.index != null) {
indexRequest.index(runContext.render(this.getIndex()));
indexRequest.index(runContext.render(this.getIndex()).as(String.class).orElseThrow());
}

//FIXME
Expand All @@ -100,11 +97,11 @@ protected Flux<BulkOperation> source(RunContext runContext, BufferedReader input
// }

if (this.idKey != null) {
String idKey = runContext.render(this.idKey);
String idKey = runContext.render(this.idKey).as(String.class).orElseThrow();

indexRequest.id(values.get(idKey).toString());

if (this.removeIdKey) {
if (runContext.render(this.removeIdKey).as(Boolean.class).orElse(true)) {
values.remove(idKey);
}
}
Expand Down
Loading

0 comments on commit 26d2dcf

Please sign in to comment.