From 92d5e1bbcefa1cc04ea7b900b4e2ac49fbc2c3c3 Mon Sep 17 00:00:00 2001 From: Mathieu Gabelle Date: Wed, 4 Dec 2024 10:36:55 +0100 Subject: [PATCH] refactor: migrate to dynamic properties --- .../plugin/elasticsearch/AbstractLoad.java | 5 +-- .../plugin/elasticsearch/AbstractSearch.java | 10 +++--- .../plugin/elasticsearch/AbstractTask.java | 4 +-- .../ElasticsearchConnection.java | 35 ++++++++---------- .../io/kestra/plugin/elasticsearch/Esql.java | 14 ++++---- .../io/kestra/plugin/elasticsearch/Get.java | 25 +++++-------- .../io/kestra/plugin/elasticsearch/Load.java | 23 ++++++------ .../io/kestra/plugin/elasticsearch/Put.java | 36 +++++++++---------- .../kestra/plugin/elasticsearch/Request.java | 25 ++++++------- .../kestra/plugin/elasticsearch/Search.java | 12 +++---- .../kestra/plugin/elasticsearch/BulkTest.java | 5 +-- .../kestra/plugin/elasticsearch/EsqlTest.java | 19 +++++----- .../kestra/plugin/elasticsearch/LoadTest.java | 7 ++-- .../plugin/elasticsearch/PutGetTest.java | 13 +++---- .../plugin/elasticsearch/RequestTest.java | 11 +++--- .../plugin/elasticsearch/ScrollTest.java | 5 +-- .../plugin/elasticsearch/SearchTest.java | 11 +++--- 17 files changed, 124 insertions(+), 136 deletions(-) diff --git a/src/main/java/io/kestra/plugin/elasticsearch/AbstractLoad.java b/src/main/java/io/kestra/plugin/elasticsearch/AbstractLoad.java index 3e9992d..958fcd1 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/AbstractLoad.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/AbstractLoad.java @@ -9,6 +9,7 @@ import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.executions.metrics.Timer; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; @@ -46,7 +47,7 @@ public abstract class AbstractLoad extends AbstractTask implements RunnableTask< ) @PluginProperty(dynamic = true) @Builder.Default - private Integer chunk = 1000; + private Property chunk = Property.of(1000); abstract protected Flux source(RunContext runContext, BufferedReader inputStream) throws IllegalVariableEvaluationException, IOException; @@ -67,7 +68,7 @@ public AbstractLoad.Output run(RunContext runContext) throws Exception { .doOnNext(docWriteRequest -> { count.incrementAndGet(); }) - .buffer(this.chunk, this.chunk) + .buffer(runContext.render(this.chunk).as(Integer.class).orElseThrow(), runContext.render(this.chunk).as(Integer.class).orElseThrow()) .map(throwFunction(indexRequests -> { var bulkRequest = new BulkRequest.Builder(); bulkRequest.operations(indexRequests); diff --git a/src/main/java/io/kestra/plugin/elasticsearch/AbstractSearch.java b/src/main/java/io/kestra/plugin/elasticsearch/AbstractSearch.java index 028a016..f2025b0 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/AbstractSearch.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/AbstractSearch.java @@ -3,6 +3,7 @@ import co.elastic.clients.elasticsearch.core.SearchRequest; import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.plugin.elasticsearch.model.XContentType; import io.swagger.v3.oas.annotations.media.Schema; @@ -24,8 +25,7 @@ public abstract class AbstractSearch extends AbstractTask { title = "The ElasticSearch indices.", description = "Default to all indices." ) - @PluginProperty(dynamic = true) - private List indexes; + private Property> indexes; @Schema( title = "The ElasticSearch value.", @@ -49,12 +49,12 @@ protected SearchRequest.Builder request(RunContext runContext) throws IllegalVar request = QueryService.request(runContext, this.request); - if (this.indexes != null) { - request.index(runContext.render(this.indexes)); + if (!runContext.render(this.indexes).asList(String.class).isEmpty()) { + request.index(runContext.render(this.indexes).asList(String.class)); } if (this.routing != null) { - request.routing(this.routing); + request.routing(runContext.render(this.routing).as(String.class).orElseThrow()); } return request; diff --git a/src/main/java/io/kestra/plugin/elasticsearch/AbstractTask.java b/src/main/java/io/kestra/plugin/elasticsearch/AbstractTask.java index 9d7e395..3cca2bc 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/AbstractTask.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/AbstractTask.java @@ -1,6 +1,7 @@ package io.kestra.plugin.elasticsearch; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.Task; import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; @@ -24,6 +25,5 @@ public abstract class AbstractTask extends Task { title = "Controls the shard routing of the request.", description = "Using this value to hash the shard and not the id." ) - @PluginProperty(dynamic = true) - protected String routing; + protected Property routing; } diff --git a/src/main/java/io/kestra/plugin/elasticsearch/ElasticsearchConnection.java b/src/main/java/io/kestra/plugin/elasticsearch/ElasticsearchConnection.java index 6259797..c2267d8 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/ElasticsearchConnection.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/ElasticsearchConnection.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.JacksonMapper; import io.swagger.v3.oas.annotations.media.Schema; @@ -57,8 +58,7 @@ public class ElasticsearchConnection { title = "List of HTTP headers to be send on every request.", description = "Must be a string with key value separated with `:`, ex: `Authorization: Token XYZ`." ) - @PluginProperty(dynamic = true) - private List headers; + private Property> headers; @Schema( title = "Sets the path's prefix for every request used by the HTTP client.", @@ -68,21 +68,18 @@ public class ElasticsearchConnection { "or a proxy that requires all paths to start with '/'; it is not intended for other purposes and " + "it should not be supplied in other scenarios." ) - @PluginProperty(dynamic = true) - private String pathPrefix; + private Property pathPrefix; @Schema( title = "Whether the REST client should return any response containing at least one warning header as a failure." ) - @PluginProperty - private Boolean strictDeprecationMode; + private Property strictDeprecationMode; @Schema( title = "Trust all SSL CA certificates.", description = "Use this if the server is using a self signed SSL certificate." ) - @PluginProperty - private Boolean trustAllSsl; + private Property trustAllSsl; @SuperBuilder @NoArgsConstructor @@ -91,14 +88,12 @@ public static class BasicAuth { @Schema( title = "Basic auth username." ) - @PluginProperty(dynamic = true) - private String username; + private Property username; @Schema( title = "Basic auth password." ) - @PluginProperty(dynamic = true) - private String password; + private Property password; } RestClientTransport client(RunContext runContext) throws IllegalVariableEvaluationException { @@ -113,12 +108,12 @@ RestClientTransport client(RunContext runContext) throws IllegalVariableEvaluati builder.setDefaultHeaders(this.defaultHeaders(runContext)); } - if (this.getPathPrefix() != null) { - builder.setPathPrefix(runContext.render(this.pathPrefix)); + if (runContext.render(this.pathPrefix).as(String.class).isPresent()) { + builder.setPathPrefix(runContext.render(this.pathPrefix).as(String.class).get()); } - if (this.getStrictDeprecationMode() != null) { - builder.setStrictDeprecationMode(this.getStrictDeprecationMode()); + if (runContext.render(this.strictDeprecationMode).as(Boolean.class).isPresent()) { + builder.setStrictDeprecationMode(runContext.render(this.strictDeprecationMode).as(Boolean.class).get()); } return new RestClientTransport(builder.build(), new JacksonJsonpMapper(MAPPER)); @@ -135,15 +130,15 @@ private HttpAsyncClientBuilder httpAsyncClientBuilder(RunContext runContext) { basicCredential.setCredentials( AuthScope.ANY, new UsernamePasswordCredentials( - runContext.render(this.basicAuth.username), - runContext.render(this.basicAuth.password) + runContext.render(this.basicAuth.username).as(String.class).orElseThrow(), + runContext.render(this.basicAuth.password).as(String.class).orElseThrow() ) ); builder.setDefaultCredentialsProvider(basicCredential); } - if (trustAllSsl != null && trustAllSsl) { + if (runContext.render(this.trustAllSsl).as(Boolean.class).orElse(false)) { SSLContextBuilder sslContextBuilder = new SSLContextBuilder(); sslContextBuilder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true); SSLContext sslContext = sslContextBuilder.build(); @@ -166,7 +161,7 @@ private HttpHost[] httpHosts(RunContext runContext) throws IllegalVariableEvalua } private Header[] defaultHeaders(RunContext runContext) throws IllegalVariableEvaluationException { - return runContext.render(this.headers) + return runContext.render(this.headers).asList(String.class) .stream() .map(header -> { String[] nameAndValue = header.split(":"); diff --git a/src/main/java/io/kestra/plugin/elasticsearch/Esql.java b/src/main/java/io/kestra/plugin/elasticsearch/Esql.java index 00598be..02781f1 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/Esql.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/Esql.java @@ -1,7 +1,6 @@ package io.kestra.plugin.elasticsearch; import co.elastic.clients.elasticsearch.ElasticsearchClient; -import co.elastic.clients.elasticsearch._helpers.esql.objects.ObjectsEsqlAdapter; import co.elastic.clients.elasticsearch.core.SearchRequest; import co.elastic.clients.elasticsearch.esql.QueryRequest; import co.elastic.clients.elasticsearch.esql.query.EsqlFormat; @@ -12,6 +11,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.common.FetchType; import io.kestra.core.runners.RunContext; @@ -61,7 +61,7 @@ - id: load type: io.kestra.plugin.elasticsearch.Bulk from: "{{ outputs.extract.uri }}" - + - id: sleep type: io.kestra.plugin.core.flow.Sleep duration: PT5S @@ -99,17 +99,15 @@ public class Esql extends AbstractTask implements RunnableTask { + "NONE do nothing." ) @Builder.Default - @PluginProperty @NotNull - private FetchType fetchType = FetchType.FETCH; + private Property fetchType = Property.of(FetchType.FETCH); @Schema( title = "The ElasticSearch value.", description = "Can be a JSON string. In this case, the contentType will be used or a raw Map." ) - @PluginProperty(dynamic = true) @NotNull - private String query; + private Property query; @Schema( title = "Query filter.", @@ -127,7 +125,7 @@ public Esql.Output run(RunContext runContext) throws Exception { // build request QueryRequest queryRequest = QueryRequest.of(throwFunction(builder -> { - builder.query(runContext.render(this.query)); + builder.query(runContext.render(this.query).as(String.class).orElseThrow()); builder.format(EsqlFormat.Json); builder.columnar(false); @@ -148,7 +146,7 @@ public Esql.Output run(RunContext runContext) throws Exception { Output.OutputBuilder outputBuilder = Esql.Output.builder(); - switch (fetchType) { + switch (runContext.render(this.fetchType).as(FetchType.class).orElseThrow()) { case FETCH: Pair>, Integer> fetch = this.fetch(queryResponse); outputBuilder diff --git a/src/main/java/io/kestra/plugin/elasticsearch/Get.java b/src/main/java/io/kestra/plugin/elasticsearch/Get.java index b8b666d..dab5c4b 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/Get.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/Get.java @@ -7,6 +7,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.swagger.v3.oas.annotations.media.Schema; @@ -37,7 +38,7 @@ - id: get type: io.kestra.plugin.elasticsearch.Get connection: - hosts: + hosts: - "http://localhost:9200" index: "my_index" key: "my_id" @@ -49,47 +50,39 @@ public class Get extends AbstractTask implements RunnableTask { @Schema( title = "The ElasticSearch index." ) - @PluginProperty(dynamic = true) @NotNull - private String index; + private Property index; @Schema( title = "The ElasticSearch id." ) - @PluginProperty(dynamic = true) @NotNull - private String key; + private Property key; @Schema( title = "Sets the version", description = "which will cause the get operation to only be performed if a matching version exists and no changes happened on the doc since then." ) - @PluginProperty @NotNull - private Long version; - + private Property version; @Override public Get.Output run(RunContext runContext) throws Exception { Logger logger = runContext.logger(); try (RestClientTransport transport = this.connection.client(runContext)) { ElasticsearchClient client = new ElasticsearchClient(transport); - String index = runContext.render(this.index); - String key = runContext.render(this.key); + String index = runContext.render(this.index).as(String.class).orElseThrow(); + String key = runContext.render(this.key).as(String.class).orElseThrow(); var request = new GetRequest.Builder(); request.index(index).id(key); if (this.version != null) { - request.version(this.version); - } - - if (this.routing != null) { - request.routing(this.routing); + request.version(runContext.render(this.version).as(Long.class).orElseThrow()); } if (this.routing != null) { - request.routing(this.routing); + request.routing(runContext.render(this.routing).as(String.class).orElseThrow()); } GetResponse response = client.get(request.build(), Map.class); diff --git a/src/main/java/io/kestra/plugin/elasticsearch/Load.java b/src/main/java/io/kestra/plugin/elasticsearch/Load.java index d7046d7..38d377f 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/Load.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/Load.java @@ -6,6 +6,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; @@ -37,7 +38,7 @@ code = """ id: elasticsearch_load namespace: company.team - + inputs: - id: file type: FILE @@ -46,7 +47,7 @@ - id: load type: io.kestra.plugin.elasticsearch.Load connection: - hosts: + hosts: - "http://localhost:9200" from: "{{ inputs.file }}" index: "my_index" @@ -59,28 +60,24 @@ public class Load extends AbstractLoad implements RunnableTask { @Schema( title = "The elasticsearch index." ) - @PluginProperty(dynamic = true) @NotNull - private String index; + private Property index; @Schema( title = "Sets the type of operation to perform." ) - @PluginProperty - private OpType opType; + private Property opType; @Schema( title = "Use this key as id." ) - @PluginProperty(dynamic = true) - private String idKey; + private Property idKey; @Schema( title = "Remove idKey from the final document." ) - @PluginProperty(dynamic = true) @Builder.Default - private Boolean removeIdKey = true; + private Property removeIdKey = Property.of(true); @SuppressWarnings("unchecked") @Override @@ -91,7 +88,7 @@ protected Flux source(RunContext runContext, BufferedReader input var indexRequest = new IndexOperation.Builder>(); if (this.index != null) { - indexRequest.index(runContext.render(this.getIndex())); + indexRequest.index(runContext.render(this.getIndex()).as(String.class).orElseThrow()); } //FIXME @@ -100,11 +97,11 @@ protected Flux source(RunContext runContext, BufferedReader input // } if (this.idKey != null) { - String idKey = runContext.render(this.idKey); + String idKey = runContext.render(this.idKey).as(String.class).orElseThrow(); indexRequest.id(values.get(idKey).toString()); - if (this.removeIdKey) { + if (runContext.render(this.removeIdKey).as(Boolean.class).orElse(true)) { values.remove(idKey); } } diff --git a/src/main/java/io/kestra/plugin/elasticsearch/Put.java b/src/main/java/io/kestra/plugin/elasticsearch/Put.java index b13ac32..fe2bbc9 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/Put.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/Put.java @@ -11,6 +11,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.JacksonMapper; @@ -41,12 +42,12 @@ code = """ id: elasticsearch_put namespace: company.team - + tasks: - id: put type: io.kestra.plugin.elasticsearch.Put connection: - hosts: + hosts: - "http://localhost:9200" index: "my_index" key: "my_id" @@ -61,17 +62,17 @@ code = """ id: elasticsearch_put namespace: company.team - + inputs: - id: value type: JSON defaults: {"name": "John Doe", "city": "Paris"} - + tasks: - id: put type: io.kestra.plugin.elasticsearch.Put connection: - hosts: + hosts: - "http://localhost:9200" index: "my_index" key: "my_id" @@ -86,21 +87,18 @@ public class Put extends AbstractTask implements RunnableTask { @Schema( title = "The elasticsearch index." ) - @PluginProperty(dynamic = true) @NotNull - private String index; + private Property index; @Schema( title = "Sets the type of operation to perform." ) - @PluginProperty - private OpType opType; + private Property opType; @Schema( title = "The elasticsearch id." ) - @PluginProperty(dynamic = true) - private String key; + private Property key; @Schema( title = "The elasticsearch value.", @@ -113,24 +111,22 @@ public class Put extends AbstractTask implements RunnableTask { title = "Should this request trigger a refresh.", description = "an immediate refresh `IMMEDIATE`, wait for a refresh `WAIT_UNTIL`, or proceed ignore refreshes entirely `NONE`." ) - @PluginProperty @Builder.Default - private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; + private Property refreshPolicy = Property.of(RefreshPolicy.NONE); @Schema( title = "The content type of `value`." ) - @PluginProperty @Builder.Default - private XContentType contentType = XContentType.JSON; + private Property contentType = Property.of(XContentType.JSON); @Override public Put.Output run(RunContext runContext) throws Exception { Logger logger = runContext.logger(); try (RestClientTransport transport = this.connection.client(runContext)) { ElasticsearchClient client = new ElasticsearchClient(transport); - String index = runContext.render(this.index); - String key = runContext.render(this.key); + String index = runContext.render(this.index).as(String.class).orElseThrow(); + String key = runContext.render(this.key).as(String.class).orElseThrow(); var request = new IndexRequest.Builder(); request.index(index); @@ -143,15 +139,15 @@ public Put.Output run(RunContext runContext) throws Exception { } if (this.opType != null) { - request.opType(this.opType.to()); + request.opType(runContext.render(this.opType).as(OpType.class).orElseThrow().to()); } if (this.refreshPolicy != null) { - request.refresh(this.refreshPolicy.to()); + request.refresh(runContext.render(this.refreshPolicy).as(RefreshPolicy.class).orElseThrow().to()); } if (this.routing != null) { - request.routing(this.routing); + request.routing(runContext.render(this.routing).as(String.class).orElseThrow()); } logger.debug("Putting doc: {}", request); diff --git a/src/main/java/io/kestra/plugin/elasticsearch/Request.java b/src/main/java/io/kestra/plugin/elasticsearch/Request.java index dfdaceb..c762e4b 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/Request.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/Request.java @@ -5,11 +5,13 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.JacksonMapper; import io.kestra.plugin.elasticsearch.model.HttpMethod; import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; import org.apache.commons.io.IOUtils; @@ -43,7 +45,7 @@ - id: request_post type: io.kestra.plugin.elasticsearch.Request connection: - hosts: + hosts: - "http://localhost:9200" method: "POST" endpoint: "my_index/_doc/john" @@ -62,7 +64,7 @@ - id: request_get type: io.kestra.plugin.elasticsearch.Request connection: - hosts: + hosts: - "http://localhost:9200" method: "GET" endpoint: "my_index/_search" @@ -94,20 +96,18 @@ public class Request extends AbstractTask implements RunnableTask method = Property.of(HttpMethod.GET); @Schema( title = "The path of the request (without scheme, host, port, or prefix)." ) - @PluginProperty(dynamic = true) - protected String endpoint; + @NotNull + protected Property endpoint; @Schema( title = "Query string parameters." ) - @PluginProperty(dynamic = true, additionalProperties = String.class) - protected Map parameters; + protected Property> parameters; @Schema( title = "The full body.", @@ -121,14 +121,15 @@ public Request.Output run(RunContext runContext) throws Exception { Logger logger = runContext.logger(); try (RestClientTransport transport = this.connection.client(runContext)) { org.elasticsearch.client.Request request = new org.elasticsearch.client.Request( - method.name(), - runContext.render(endpoint) + runContext.render(method).as(HttpMethod.class).orElseThrow().name(), + runContext.render(endpoint).as(String.class).orElseThrow() ); if (this.parameters != null) { - this.parameters.entrySet() + runContext.render(this.parameters).asMap(String.class, String.class) + .entrySet() .forEach(throwConsumer(e -> { - request.addParameter(runContext.render(e.getKey()), runContext.render(e.getValue())); + request.addParameter(e.getKey(), e.getValue()); })); } diff --git a/src/main/java/io/kestra/plugin/elasticsearch/Search.java b/src/main/java/io/kestra/plugin/elasticsearch/Search.java index 99333f4..5bd6bc2 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/Search.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/Search.java @@ -9,6 +9,7 @@ import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.executions.metrics.Timer; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.common.FetchType; import io.kestra.core.runners.RunContext; @@ -45,17 +46,17 @@ code = """ id: elasticsearch_search namespace: company.team - + tasks: - id: search type: io.kestra.plugin.elasticsearch.Search connection: - hosts: + hosts: - "http://localhost:9200" indexes: - "my_index" request: - query: + query: term: name: value: 'john' @@ -72,8 +73,7 @@ public class Search extends AbstractSearch implements RunnableTask fetchType = Property.of(FetchType.FETCH); @Override public Search.Output run(RunContext runContext) throws Exception { @@ -89,7 +89,7 @@ public Search.Output run(RunContext runContext) throws Exception { Output.OutputBuilder outputBuilder = Search.Output.builder(); - switch (fetchType) { + switch (runContext.render(fetchType).as(FetchType.class).orElseThrow()) { case FETCH: Pair>, Integer> fetch = this.fetch(searchResponse); outputBuilder diff --git a/src/test/java/io/kestra/plugin/elasticsearch/BulkTest.java b/src/test/java/io/kestra/plugin/elasticsearch/BulkTest.java index 4c15c68..42e8935 100644 --- a/src/test/java/io/kestra/plugin/elasticsearch/BulkTest.java +++ b/src/test/java/io/kestra/plugin/elasticsearch/BulkTest.java @@ -1,6 +1,7 @@ package io.kestra.plugin.elasticsearch; import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; import io.kestra.core.serializers.FileSerde; @@ -70,7 +71,7 @@ void run() throws Exception { Bulk put = Bulk.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) .from(uri.toString()) - .chunk(10) + .chunk(Property.of(10)) .build(); Bulk.Output runOutput = put.run(runContext); @@ -97,7 +98,7 @@ void runIon() throws Exception { Bulk put = Bulk.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) .from(uri.toString()) - .chunk(10) + .chunk(Property.of(10)) .build(); Bulk.Output runOutput = put.run(runContext); diff --git a/src/test/java/io/kestra/plugin/elasticsearch/EsqlTest.java b/src/test/java/io/kestra/plugin/elasticsearch/EsqlTest.java index a891fe8..3ccf290 100644 --- a/src/test/java/io/kestra/plugin/elasticsearch/EsqlTest.java +++ b/src/test/java/io/kestra/plugin/elasticsearch/EsqlTest.java @@ -1,6 +1,7 @@ package io.kestra.plugin.elasticsearch; import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.common.FetchType; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; @@ -37,10 +38,10 @@ void run() throws Exception { Esql task = Esql.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .query(""" + .query(Property.of(""" FROM gbif | WHERE key == 925277090 - """) + """)) .build(); Esql.Output run = task.run(runContext); @@ -55,7 +56,7 @@ void filter() throws Exception { Esql task = Esql.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .query("FROM gbif") + .query(Property.of("FROM gbif")) .filter(""" { "query": { @@ -78,11 +79,11 @@ void runFetchOne() throws Exception { Esql task = Esql.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .query(""" + .query(Property.of(""" FROM gbif | WHERE publishingCountry.keyword == "BE" - """) - .fetchType(FetchType.FETCH_ONE) + """)) + .fetchType(Property.of(FetchType.FETCH_ONE)) .build(); Esql.Output run = task.run(runContext); @@ -99,12 +100,12 @@ void runStored() throws Exception { Esql task = Esql.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .query(""" + .query(Property.of(""" FROM gbif | WHERE publishingCountry.keyword == "BE" | LIMIT 10 - """) - .fetchType(FetchType.STORE) + """)) + .fetchType(Property.of(FetchType.STORE)) .build(); Esql.Output run = task.run(runContext); diff --git a/src/test/java/io/kestra/plugin/elasticsearch/LoadTest.java b/src/test/java/io/kestra/plugin/elasticsearch/LoadTest.java index 589b54d..3735a1d 100644 --- a/src/test/java/io/kestra/plugin/elasticsearch/LoadTest.java +++ b/src/test/java/io/kestra/plugin/elasticsearch/LoadTest.java @@ -1,6 +1,7 @@ package io.kestra.plugin.elasticsearch; import com.google.common.collect.ImmutableMap; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; import io.kestra.core.serializers.FileSerde; @@ -51,10 +52,10 @@ void run() throws Exception { Load put = Load.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .index(indice) + .index(Property.of(indice)) .from(uri.toString()) - .chunk(10) - .idKey("id") + .chunk(Property.of(10)) + .idKey(Property.of("id")) .build(); Load.Output runOutput = put.run(runContext); diff --git a/src/test/java/io/kestra/plugin/elasticsearch/PutGetTest.java b/src/test/java/io/kestra/plugin/elasticsearch/PutGetTest.java index 197612c..96cec14 100644 --- a/src/test/java/io/kestra/plugin/elasticsearch/PutGetTest.java +++ b/src/test/java/io/kestra/plugin/elasticsearch/PutGetTest.java @@ -1,6 +1,7 @@ package io.kestra.plugin.elasticsearch; import com.google.common.collect.ImmutableMap; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; import io.kestra.core.utils.IdUtils; @@ -31,7 +32,7 @@ void run() throws Exception { Put put = Put.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .index(indice) + .index(Property.of(indice)) .value("{{ variable }}") .build(); @@ -39,8 +40,8 @@ void run() throws Exception { Get task = Get.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .index(indice) - .key(putOutput.getId()) + .index(Property.of(indice)) + .key(Property.of(putOutput.getId())) .build(); Get.Output runOutput = task.run(runContext); @@ -49,7 +50,7 @@ void run() throws Exception { put = Put.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .index(indice) + .index(Property.of(indice)) .value(Map.of( "name", "Jane Doe" )) @@ -59,8 +60,8 @@ void run() throws Exception { task = Get.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .index(indice) - .key(putOutput.getId()) + .index(Property.of(indice)) + .key(Property.of(putOutput.getId())) .build(); runOutput = task.run(runContext); diff --git a/src/test/java/io/kestra/plugin/elasticsearch/RequestTest.java b/src/test/java/io/kestra/plugin/elasticsearch/RequestTest.java index b62b0c6..c0496f6 100644 --- a/src/test/java/io/kestra/plugin/elasticsearch/RequestTest.java +++ b/src/test/java/io/kestra/plugin/elasticsearch/RequestTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.elasticsearch; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; import io.kestra.core.utils.IdUtils; @@ -33,9 +34,9 @@ void run() throws Exception { Request request = Request.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .method(HttpMethod.POST) - .endpoint(indice + "/_doc/" + IdUtils.create()) - .parameters(Map.of("human", "true")) + .method(Property.of(HttpMethod.POST)) + .endpoint(Property.of(indice + "/_doc/" + IdUtils.create())) + .parameters(Property.of(Map.of("human", "true"))) .body(Map.of("name", "john")) .build(); @@ -51,8 +52,8 @@ void cat() throws Exception { Request request = Request.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .method(HttpMethod.GET) - .endpoint("_cat/indices") + .method(Property.of(HttpMethod.GET)) + .endpoint(Property.of("_cat/indices")) .build(); Request.Output runOutput = request.run(runContext); diff --git a/src/test/java/io/kestra/plugin/elasticsearch/ScrollTest.java b/src/test/java/io/kestra/plugin/elasticsearch/ScrollTest.java index 37b44b1..730e3c0 100644 --- a/src/test/java/io/kestra/plugin/elasticsearch/ScrollTest.java +++ b/src/test/java/io/kestra/plugin/elasticsearch/ScrollTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.elasticsearch; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; import io.micronaut.context.annotation.Value; @@ -27,7 +28,7 @@ void run() throws Exception { Scroll task = Scroll.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .indexes(Collections.singletonList("gbif")) + .indexes(Property.of(Collections.singletonList("gbif"))) .request(""" { "query": { @@ -49,7 +50,7 @@ void runFull() throws Exception { Scroll task = Scroll.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .indexes(Collections.singletonList("gbif")) + .indexes(Property.of(Collections.singletonList("gbif"))) .request(""" { "query": { diff --git a/src/test/java/io/kestra/plugin/elasticsearch/SearchTest.java b/src/test/java/io/kestra/plugin/elasticsearch/SearchTest.java index 6643b09..66b0d58 100644 --- a/src/test/java/io/kestra/plugin/elasticsearch/SearchTest.java +++ b/src/test/java/io/kestra/plugin/elasticsearch/SearchTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.elasticsearch; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.common.FetchType; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; @@ -38,7 +39,7 @@ void run() throws Exception { Search task = Search.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .indexes(Collections.singletonList("gbif")) + .indexes(Property.of(Collections.singletonList("gbif"))) .request(""" { "query": { @@ -61,7 +62,7 @@ void runFetchOne() throws Exception { Search task = Search.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .indexes(Collections.singletonList("gbif")) + .indexes(Property.of(Collections.singletonList("gbif"))) .request(""" { "query": { @@ -73,7 +74,7 @@ void runFetchOne() throws Exception { "key": "asc" } }""") - .fetchType(FetchType.FETCH_ONE) + .fetchType(Property.of(FetchType.FETCH_ONE)) .build(); Search.Output run = task.run(runContext); @@ -90,7 +91,7 @@ void runStored() throws Exception { Search task = Search.builder() .connection(ElasticsearchConnection.builder().hosts(hosts).build()) - .indexes(Collections.singletonList("gbif")) + .indexes(Property.of(Collections.singletonList("gbif"))) .request(""" { "query": { @@ -99,7 +100,7 @@ void runStored() throws Exception { } } }""") - .fetchType(FetchType.STORE) + .fetchType(Property.of(FetchType.STORE)) .build(); Search.Output run = task.run(runContext);